Repository: falcon Updated Branches: refs/heads/master 6d4ff0bef -> 2d51db7a0
FALCON-1623 Implement safemode in Falcon Server Author: bvellanki <[email protected]> Reviewers: "yzheng-hortonworks <[email protected]>, Venkat Ranganathan <[email protected]>, Sowmya Ramesh <[email protected]>" Closes #116 from bvellanki/FALCON-1623 Project: http://git-wip-us.apache.org/repos/asf/falcon/repo Commit: http://git-wip-us.apache.org/repos/asf/falcon/commit/2d51db7a Tree: http://git-wip-us.apache.org/repos/asf/falcon/tree/2d51db7a Diff: http://git-wip-us.apache.org/repos/asf/falcon/diff/2d51db7a Branch: refs/heads/master Commit: 2d51db7a02394ce6856b4c622283f5afd99398eb Parents: 6d4ff0b Author: bvellanki <[email protected]> Authored: Tue May 3 11:23:43 2016 -0700 Committer: bvellanki <[email protected]> Committed: Tue May 3 11:23:43 2016 -0700 ---------------------------------------------------------------------- .../org/apache/falcon/cli/FalconAdminCLI.java | 14 ++ .../org/apache/falcon/FalconCLIConstants.java | 1 + .../org/apache/falcon/client/FalconClient.java | 67 ++++--- .../org/apache/falcon/entity/EntityUtil.java | 14 ++ .../apache/falcon/security/SecurityUtil.java | 1 + .../apache/falcon/util/StartupProperties.java | 52 +++++- .../workflow/engine/OozieWorkflowEngine.java | 19 ++ pom.xml | 2 +- .../java/org/apache/falcon/FalconServer.java | 24 ++- .../falcon/resource/AbstractEntityManager.java | 47 ++++- .../resource/AbstractInstanceManager.java | 54 +++--- .../AbstractSchedulableEntityManager.java | 30 +-- .../falcon/resource/admin/AdminResource.java | 40 ++++ .../resource/admin/AdminResourceTest.java | 36 ++-- .../falcon/rerun/handler/LateRerunConsumer.java | 5 +- src/bin/service-start.sh | 11 ++ .../apache/falcon/cli/FalconSafemodeCLIIT.java | 183 +++++++++++++++++++ 17 files changed, 517 insertions(+), 83 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java ---------------------------------------------------------------------- diff --git a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java b/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java index 56cc5b9..84439b9 100644 --- a/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java +++ b/cli/src/main/java/org/apache/falcon/cli/FalconAdminCLI.java @@ -18,6 +18,7 @@ package org.apache.falcon.cli; +import com.sun.jersey.api.client.ClientResponse; import org.apache.commons.cli.CommandLine; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionGroup; @@ -55,6 +56,8 @@ public class FalconAdminCLI extends FalconCLI { "show the thread stack dump"); Option doAs = new Option(FalconCLIConstants.DO_AS_OPT, true, "doAs user"); + Option safemode = new Option(FalconCLIConstants.SAFE_MODE_OPT, true, + "doAs user"); Option help = new Option("help", false, "show Falcon help"); Option debug = new Option(FalconCLIConstants.DEBUG_OPTION, false, "Use debug mode to see debugging statements on stdout"); @@ -62,6 +65,7 @@ public class FalconAdminCLI extends FalconCLI { group.addOption(version); group.addOption(stack); group.addOption(help); + group.addOption(safemode); adminOptions.addOptionGroup(group); adminOptions.addOption(doAs); @@ -102,6 +106,16 @@ public class FalconAdminCLI extends FalconCLI { } else if (optionsList.contains(FalconCLIConstants.VERSION_OPT)) { result = client.getVersion(doAsUser); OUT.get().println("Falcon server build version: " + result); + } else if (optionsList.contains(FalconCLIConstants.SAFE_MODE_OPT)) { + String safemode = commandLine.getOptionValue(FalconCLIConstants.SAFE_MODE_OPT); + ClientResponse response = client.setSafemode(safemode, doAsUser); + if (response.getStatus() == 200) { + OUT.get().println("Falcon server safemode set to : " + safemode); + } else { + ERR.get().println("Unable to set Falcon server to safemode value : " + safemode); + ERR.get().println(response.toString()); + exitValue = -1; + } } else if (optionsList.contains(FalconCLIConstants.HELP_CMD)) { OUT.get().println("Falcon Help"); } http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java index 436875d..1db5cfe 100644 --- a/client/src/main/java/org/apache/falcon/FalconCLIConstants.java +++ b/client/src/main/java/org/apache/falcon/FalconCLIConstants.java @@ -35,6 +35,7 @@ public final class FalconCLIConstants { public static final String ENTITY_CMD = "entity"; public static final String INSTANCE_CMD = "instance"; public static final String EXTENSION_CMD = "extension"; + public static final String SAFE_MODE_OPT = "setsafemode"; public static final String TYPE_OPT = "type"; public static final String COLO_OPT = "colo"; http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/client/src/main/java/org/apache/falcon/client/FalconClient.java ---------------------------------------------------------------------- diff --git a/client/src/main/java/org/apache/falcon/client/FalconClient.java b/client/src/main/java/org/apache/falcon/client/FalconClient.java index 36fb873..7a48973 100644 --- a/client/src/main/java/org/apache/falcon/client/FalconClient.java +++ b/client/src/main/java/org/apache/falcon/client/FalconClient.java @@ -18,30 +18,11 @@ package org.apache.falcon.client; -import java.io.BufferedReader; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.FileReader; -import java.io.IOException; -import java.io.InputStream; -import java.io.PrintStream; -import java.io.UnsupportedEncodingException; -import java.net.URL; -import java.security.SecureRandom; -import java.util.List; -import java.util.Properties; -import java.util.concurrent.atomic.AtomicReference; - -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.HttpsURLConnection; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; -import javax.net.ssl.TrustManager; -import javax.ws.rs.HttpMethod; -import javax.ws.rs.core.MediaType; -import javax.ws.rs.core.Response; -import javax.ws.rs.core.UriBuilder; - +import com.sun.jersey.api.client.Client; +import com.sun.jersey.api.client.ClientResponse; +import com.sun.jersey.api.client.WebResource; +import com.sun.jersey.api.client.config.DefaultClientConfig; +import com.sun.jersey.client.urlconnection.HTTPSProperties; import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.net.util.TrustManagerUtils; @@ -68,11 +49,28 @@ import org.apache.hadoop.security.authentication.client.AuthenticatedURL; import org.apache.hadoop.security.authentication.client.KerberosAuthenticator; import org.apache.hadoop.security.authentication.client.PseudoAuthenticator; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HTTPSProperties; +import javax.net.ssl.HostnameVerifier; +import javax.net.ssl.HttpsURLConnection; +import javax.net.ssl.SSLContext; +import javax.net.ssl.SSLSession; +import javax.net.ssl.TrustManager; +import javax.ws.rs.HttpMethod; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; +import javax.ws.rs.core.UriBuilder; +import java.io.BufferedReader; +import java.io.FileInputStream; +import java.io.FileNotFoundException; +import java.io.FileReader; +import java.io.IOException; +import java.io.InputStream; +import java.io.PrintStream; +import java.io.UnsupportedEncodingException; +import java.net.URL; +import java.security.SecureRandom; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.atomic.AtomicReference; /** * Client API to submit and manage Falcon Entities (Cluster, Feed, Process) jobs @@ -318,7 +316,8 @@ public class FalconClient extends AbstractFalconClient { protected static enum AdminOperations { STACK("api/admin/stack", HttpMethod.GET, MediaType.TEXT_PLAIN), - VERSION("api/admin/version", HttpMethod.GET, MediaType.APPLICATION_JSON); + VERSION("api/admin/version", HttpMethod.GET, MediaType.APPLICATION_JSON), + SAFEMODE("api/admin/setSafeMode", HttpMethod.GET, MediaType.APPLICATION_JSON); private String path; private String method; @@ -706,6 +705,14 @@ public class FalconClient extends AbstractFalconClient { return clientResponse.getStatus(); } + public ClientResponse setSafemode(String safemode, String doAsUser) throws FalconCLIException { + AdminOperations job = AdminOperations.SAFEMODE; + ClientResponse clientResponse = new ResourceBuilder().path(job.path).path(safemode) + .addQueryParam(DO_AS_OPT, doAsUser).call(job); + printClientResponse(clientResponse); + return clientResponse; + } + public String getDimensionList(String dimensionType, String cluster, String doAsUser) throws FalconCLIException { return sendMetadataDiscoveryRequest(MetadataOperations.LIST, dimensionType, null, cluster, doAsUser); } http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java index 8825a65..b181ece 100644 --- a/common/src/main/java/org/apache/falcon/entity/EntityUtil.java +++ b/common/src/main/java/org/apache/falcon/entity/EntityUtil.java @@ -124,6 +124,20 @@ public final class EntityUtil { } } + /** + * List of entity operations. + */ + public enum ENTITY_OPERATION { + SUBMIT, + UPDATE, + SCHEDULE, + SUBMIT_AND_SCHEDULE, + DELETE, + SUSPEND, + RESUME, + TOUCH + } + private EntityUtil() {} public static <T extends Entity> T getEntity(EntityType type, String entityName) throws FalconException { http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java index fe04c40..7191f72 100644 --- a/common/src/main/java/org/apache/falcon/security/SecurityUtil.java +++ b/common/src/main/java/org/apache/falcon/security/SecurityUtil.java @@ -141,4 +141,5 @@ public final class SecurityUtil { } } } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/common/src/main/java/org/apache/falcon/util/StartupProperties.java ---------------------------------------------------------------------- diff --git a/common/src/main/java/org/apache/falcon/util/StartupProperties.java b/common/src/main/java/org/apache/falcon/util/StartupProperties.java index 7522b0d..92ffa04 100644 --- a/common/src/main/java/org/apache/falcon/util/StartupProperties.java +++ b/common/src/main/java/org/apache/falcon/util/StartupProperties.java @@ -19,7 +19,13 @@ package org.apache.falcon.util; import org.apache.falcon.FalconException; +import org.apache.falcon.hadoop.HadoopClientFactory; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import java.io.IOException; import java.util.Properties; import java.util.concurrent.atomic.AtomicReference; @@ -28,7 +34,14 @@ import java.util.concurrent.atomic.AtomicReference; */ public final class StartupProperties extends ApplicationProperties { + public static final String SAFEMODE_PROPERTY = "falcon.safeMode"; + private static final String SAFEMODE_FILE = ".safemode"; + private static final String CONFIGSTORE_PROPERTY = "config.store.uri"; + private static FileSystem fileSystem; + private static Path storePath; + private static final String PROPERTY_FILE = "startup.properties"; + private static final Logger LOG = LoggerFactory.getLogger(StartupProperties.class); private static final AtomicReference<StartupProperties> INSTANCE = new AtomicReference<StartupProperties>(); @@ -46,10 +59,47 @@ public final class StartupProperties extends ApplicationProperties { try { if (INSTANCE.get() == null) { INSTANCE.compareAndSet(null, new StartupProperties()); + storePath = new Path((INSTANCE.get().getProperty(CONFIGSTORE_PROPERTY))); + fileSystem = HadoopClientFactory.get().createFalconFileSystem(storePath.toUri()); + String isSafeMode = (doesSafemodeFileExist()) ? "true" : "false"; + LOG.info("Initializing Falcon StartupProperties with safemode set to {}.", isSafeMode); + INSTANCE.get().setProperty(SAFEMODE_PROPERTY, isSafeMode); } return INSTANCE.get(); } catch (FalconException e) { - throw new RuntimeException("Unable to read application " + "startup properties", e); + throw new RuntimeException("Unable to read application startup properties", e); + } catch (IOException e) { + throw new RuntimeException("Unable to verify Falcon safemode", e); } } + + public static void createSafemodeFile() throws IOException { + Path safemodeFilePath = getSafemodeFilePath(); + if (!doesSafemodeFileExist()) { + boolean success = fileSystem.createNewFile(safemodeFilePath); + if (!success) { + LOG.error("Failed to create safemode file at {}", safemodeFilePath.toUri()); + throw new IOException("Failed to create safemode file at " + safemodeFilePath.toUri()); + } + } + INSTANCE.get().setProperty(SAFEMODE_PROPERTY, "true"); + } + + public static boolean deleteSafemodeFile() throws IOException { + INSTANCE.get().setProperty(SAFEMODE_PROPERTY, "false"); + return !doesSafemodeFileExist() || fileSystem.delete(getSafemodeFilePath(), true); + } + + public static boolean doesSafemodeFileExist() throws IOException { + return fileSystem.exists(getSafemodeFilePath()); + } + + private static Path getSafemodeFilePath() { + return new Path(storePath, SAFEMODE_FILE); + } + + public static boolean isServerInSafeMode() { + return Boolean.parseBoolean(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false")); + } + } http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java ---------------------------------------------------------------------- diff --git a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java index 05d5ef9..6b87b38 100644 --- a/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java +++ b/oozie/src/main/java/org/apache/falcon/workflow/engine/OozieWorkflowEngine.java @@ -150,6 +150,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public void schedule(Entity entity, Boolean skipDryRun, Map<String, String> suppliedProps) throws FalconException { + if (StartupProperties.isServerInSafeMode()) { + throwSafemodeException("SCHEDULE"); + } Map<String, BundleJob> bundleMap = findLatestBundle(entity); List<String> schedClusters = new ArrayList<String>(); for (Map.Entry<String, BundleJob> entry : bundleMap.entrySet()) { @@ -181,6 +184,12 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } } + private void throwSafemodeException(String operation) throws FalconException { + String error = "Workflow Engine does not allow " + operation + " opeartion when Falcon server is in safemode"; + LOG.error(error); + throw new FalconException(error); + } + /** * Prepare the staging and logs dir for this entity with default permissions. * @@ -204,6 +213,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { @Override public void dryRun(Entity entity, String clusterName, Boolean skipDryRun) throws FalconException { + if (StartupProperties.isServerInSafeMode()) { + throwSafemodeException("DRYRUN"); + } OozieEntityBuilder builder = OozieEntityBuilder.get(entity); Path buildPath = new Path("/tmp", "falcon" + entity.getName() + System.currentTimeMillis()); Cluster cluster = STORE.get(EntityType.CLUSTER, clusterName); @@ -413,6 +425,9 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { } private String doBundleAction(Entity entity, BundleAction action) throws FalconException { + if (StartupProperties.isServerInSafeMode() && !action.equals(BundleAction.SUSPEND)) { + throwSafemodeException(action.name()); + } Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); String result = null; for (String cluster : clusters) { @@ -637,6 +652,10 @@ public class OozieWorkflowEngine extends AbstractWorkflowEngine { private InstancesResult doJobAction(JobAction action, Entity entity, Date start, Date end, Properties props, List<LifeCycle> lifeCycles) throws FalconException { + if (StartupProperties.isServerInSafeMode() + && (action.equals(JobAction.RERUN) || action.equals(JobAction.RESUME))) { + throwSafemodeException(action.name()); + } return doJobAction(action, entity, start, end, props, lifeCycles, null); } http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index a72889e..54863b2 100644 --- a/pom.xml +++ b/pom.xml @@ -122,7 +122,7 @@ <activeByDefault>true</activeByDefault> </activation> <properties> - <hadoop.version>2.7.1</hadoop.version> + <hadoop.version>2.6.2</hadoop.version> </properties> <dependencyManagement> <dependencies> http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/FalconServer.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/FalconServer.java b/prism/src/main/java/org/apache/falcon/FalconServer.java index ea341b3..3d9879a 100644 --- a/prism/src/main/java/org/apache/falcon/FalconServer.java +++ b/prism/src/main/java/org/apache/falcon/FalconServer.java @@ -27,9 +27,9 @@ import org.apache.commons.cli.ParseException; import org.apache.commons.lang3.StringUtils; import org.apache.falcon.util.BuildProperties; import org.apache.falcon.util.EmbeddedServer; +import org.apache.falcon.util.StartupProperties; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.falcon.util.StartupProperties; /** * Driver for running Falcon as a standalone server with embedded jetty server. @@ -38,6 +38,8 @@ public final class FalconServer { private static final Logger LOG = LoggerFactory.getLogger(FalconServer.class); private static final String APP_PATH = "app"; private static final String APP_PORT = "port"; + private static final String SAFE_MODE = "setsafemode"; + private static EmbeddedServer server; private static BrokerService broker; @@ -59,6 +61,10 @@ public final class FalconServer { opt.setRequired(false); options.addOption(opt); + opt = new Option(SAFE_MODE, true, "Application mode, start safemode if true"); + opt.setRequired(false); + options.addOption(opt); + return new GnuParser().parse(options, args); } @@ -88,6 +94,16 @@ public final class FalconServer { appPath = cmd.getOptionValue(APP_PATH); } + if (cmd.hasOption(SAFE_MODE)) { + validateSafemode(cmd.getOptionValue(SAFE_MODE)); + boolean isSafeMode = Boolean.parseBoolean(cmd.getOptionValue(SAFE_MODE)); + if (isSafeMode) { + StartupProperties.createSafemodeFile(); + } else { + StartupProperties.deleteSafemodeFile(); + } + } + final String enableTLSFlag = StartupProperties.get().getProperty("falcon.enableTLS"); final int appPort = getApplicationPort(cmd, enableTLSFlag); final boolean enableTLS = isTLSEnabled(enableTLSFlag, appPort); @@ -102,6 +118,12 @@ public final class FalconServer { server.start(); } + private static void validateSafemode(String isSafeMode) throws Exception { + if (!("true".equals(isSafeMode) || "false".equals(isSafeMode))) { + throw new Exception("Invalid value for argument safemode. Allowed values are \"true\" or \"false\""); + } + } + private static int getApplicationPort(CommandLine cmd, String enableTLSFlag) { final int appPort; if (cmd.hasOption(APP_PORT)) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java index c119f23..b319dd1 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractEntityManager.java @@ -46,6 +46,7 @@ import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.util.RuntimeProperties; +import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.hadoop.io.IOUtils; @@ -226,8 +227,9 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { Entity entity = deserializeEntity(inputStream, entityType); validate(entity); - //Validate that the entity can be scheduled in the cluster - if (entity.getEntityType().isSchedulable()) { + // Validate that the entity can be scheduled in the cluster. + // Perform dryrun only if falcon is not in safemode. + if (entity.getEntityType().isSchedulable() && !StartupProperties.isServerInSafeMode()) { Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); for (String cluster : clusters) { try { @@ -266,6 +268,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { String removedFromEngine = ""; try { Entity entityObj = EntityUtil.getEntity(type, entity); + verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.DELETE); canRemove(entityObj); obtainEntityLocks(entityObj, "delete", tokenList); @@ -307,6 +310,7 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { try { EntityType entityType = EntityType.getEnum(type); Entity entity = deserializeEntity(inputStream, entityType); + verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.UPDATE); return update(entity, type, entityName, skipDryRun); } catch (IOException | FalconException e) { LOG.error("Update failed", e); @@ -435,10 +439,49 @@ public abstract class AbstractEntityManager extends AbstractMetadataResource { throws IOException, FalconException { EntityType entityType = EntityType.getEnum(type); Entity entity = deserializeEntity(inputStream, entityType); + verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.SUBMIT); submitInternal(entity, doAsUser); return entity; } + protected void verifySafemodeOperation(Entity entity, EntityUtil.ENTITY_OPERATION operation) { + // if Falcon not in safemode, return + if (!StartupProperties.isServerInSafeMode()) { + return; + } + + switch (operation) { + case UPDATE: + if (entity.getEntityType().equals(EntityType.CLUSTER)) { + return; + } else { + LOG.error("Entity operation {} is not allowed on non-cluster entities during safemode", + operation.name()); + throw FalconWebException.newAPIException("Entity operation " + operation.name() + + " is only allowed on cluster entities during safemode"); + } + case SUSPEND: + if (entity.getEntityType().equals(EntityType.CLUSTER)) { + LOG.error("Entity operation {} is not allowed on cluster entity", + operation.name()); + throw FalconWebException.newAPIException("Entity operation " + operation.name() + + " is not allowed on cluster entity"); + } else { + return; + } + case SCHEDULE: + case SUBMIT_AND_SCHEDULE: + case DELETE: + case RESUME: + case TOUCH: + case SUBMIT: + default: + LOG.error("Entity operation {} is not allowed during safemode", operation.name()); + throw FalconWebException.newAPIException("Entity operation " + + operation.name() + " not allowed during safemode"); + } + } + protected synchronized void submitInternal(Entity entity, String doAsUser) throws IOException, FalconException { EntityType entityType = entity.getEntityType(); List<Entity> tokenList = new ArrayList<>(); http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java index ba183c8..528ff98 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractInstanceManager.java @@ -18,25 +18,6 @@ package org.apache.falcon.resource; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Calendar; -import java.util.Collections; -import java.util.Comparator; -import java.util.Date; -import java.util.HashMap; -import java.util.HashSet; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.Properties; -import java.util.Queue; -import java.util.Set; - -import javax.servlet.ServletInputStream; -import javax.servlet.http.HttpServletRequest; - import com.thinkaurelius.titan.core.TitanMultiVertexQuery; import com.thinkaurelius.titan.core.TitanVertex; import com.thinkaurelius.titan.graphdb.blueprints.TitanBlueprintsGraph; @@ -69,11 +50,29 @@ import org.apache.falcon.metadata.RelationshipType; import org.apache.falcon.resource.InstancesResult.Instance; import org.apache.falcon.resource.InstancesSummaryResult.InstanceSummary; import org.apache.falcon.util.DeploymentUtil; +import org.apache.falcon.util.StartupProperties; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.ServletInputStream; +import javax.servlet.http.HttpServletRequest; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Calendar; +import java.util.Collections; +import java.util.Comparator; +import java.util.Date; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.Queue; +import java.util.Set; + /** * A base class for managing Entity's Instance operations. */ @@ -616,7 +615,8 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } public InstancesResult resumeInstance(HttpServletRequest request, String type, String entity, String startStr, - String endStr, String colo, List<LifeCycle> lifeCycles) { + String endStr, String colo, + List<LifeCycle> lifeCycles) { Properties props = getProperties(request); return resumeInstance(props, type, entity, startStr, endStr, colo, lifeCycles); } @@ -625,6 +625,9 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { String colo, List<LifeCycle> lifeCycles) { checkColo(colo); checkType(type); + if (StartupProperties.isServerInSafeMode()) { + throwSafemodeException("RESUME"); + } try { lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type); validateParams(type, entity); @@ -912,6 +915,9 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { String colo, List<LifeCycle> lifeCycles, Boolean isForced) { checkColo(colo); checkType(type); + if (StartupProperties.isServerInSafeMode()) { + throwSafemodeException("RERUN"); + } try { lifeCycles = checkAndUpdateLifeCycle(lifeCycles, type); validateParams(type, entity); @@ -1066,4 +1072,10 @@ public abstract class AbstractInstanceManager extends AbstractEntityManager { } return earliestDate; } + + private void throwSafemodeException(String operation) { + String error = "Instance operation " + operation + " cannot be performed when server is in safemode"; + LOG.error(error); + throw FalconWebException.newAPIException(error); + } } http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java index 864381a..c6903a4 100644 --- a/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java +++ b/prism/src/main/java/org/apache/falcon/resource/AbstractSchedulableEntityManager.java @@ -18,19 +18,6 @@ package org.apache.falcon.resource; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Date; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - -import javax.servlet.http.HttpServletRequest; -import javax.ws.rs.PathParam; -import javax.ws.rs.QueryParam; -import javax.ws.rs.core.Context; - import org.apache.commons.lang3.StringUtils; import org.apache.falcon.FalconException; import org.apache.falcon.FalconWebException; @@ -48,10 +35,21 @@ import org.apache.falcon.service.FeedSLAMonitoringService; import org.apache.falcon.util.DeploymentUtil; import org.apache.falcon.workflow.WorkflowEngineFactory; import org.apache.hadoop.security.authorize.AuthorizationException; - import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import javax.servlet.http.HttpServletRequest; +import javax.ws.rs.PathParam; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.Context; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + /** * REST resource of allowed actions on Schedulable Entities, Only Process and * Feed can have schedulable actions. @@ -94,6 +92,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM Entity entityObj = null; try { entityObj = EntityUtil.getEntity(type, entity); + verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.SCHEDULE); //first acquire lock on entity before scheduling if (!memoryLocks.acquireLock(entityObj, "schedule")) { throw FalconWebException.newAPIException("Looks like an schedule/update command is already" @@ -221,6 +220,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM try { checkSchedulableEntity(type); Entity entityObj = EntityUtil.getEntity(type, entity); + verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.SUSPEND); if (getWorkflowEngine(entityObj).isActive(entityObj)) { getWorkflowEngine(entityObj).suspend(entityObj); } else { @@ -249,6 +249,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM try { checkSchedulableEntity(type); Entity entityObj = EntityUtil.getEntity(type, entity); + verifySafemodeOperation(entityObj, EntityUtil.ENTITY_OPERATION.RESUME); if (getWorkflowEngine(entityObj).isActive(entityObj)) { getWorkflowEngine(entityObj).resume(entityObj); } else { @@ -355,6 +356,7 @@ public abstract class AbstractSchedulableEntityManager extends AbstractInstanceM StringBuilder result = new StringBuilder(); try { Entity entity = EntityUtil.getEntity(type, entityName); + verifySafemodeOperation(entity, EntityUtil.ENTITY_OPERATION.TOUCH); decorateEntityWithACL(entity); Set<String> clusters = EntityUtil.getClustersDefinedInColos(entity); for (String cluster : clusters) { http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java ---------------------------------------------------------------------- diff --git a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java index a75c97c..081141b 100644 --- a/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java +++ b/prism/src/main/java/org/apache/falcon/resource/admin/AdminResource.java @@ -19,6 +19,7 @@ package org.apache.falcon.resource.admin; import org.apache.commons.lang3.StringUtils; +import org.apache.falcon.FalconWebException; import org.apache.falcon.security.CurrentUser; import org.apache.falcon.security.SecurityUtil; import org.apache.falcon.util.BuildProperties; @@ -26,6 +27,8 @@ import org.apache.falcon.util.DeploymentProperties; import org.apache.falcon.util.RuntimeProperties; import org.apache.falcon.util.StartupProperties; import org.apache.hadoop.util.VersionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import javax.servlet.http.Cookie; import javax.servlet.http.HttpServletResponse; @@ -35,9 +38,11 @@ import javax.ws.rs.PathParam; import javax.ws.rs.Produces; import javax.ws.rs.core.Context; import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.Response; import javax.xml.bind.annotation.XmlAccessType; import javax.xml.bind.annotation.XmlAccessorType; import javax.xml.bind.annotation.XmlRootElement; +import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Properties; @@ -47,6 +52,8 @@ import java.util.Properties; */ @Path("admin") public class AdminResource { + public static final String SAFEMODE = "safemode"; + private static final Logger LOG = LoggerFactory.getLogger(AdminResource.class); /** * Get stack trace of the falcon server. @@ -107,6 +114,11 @@ public class AdminResource { property.value = StartupProperties.get().getProperty("falcon.authentication.type", "simple"); props.add(property); + property = new Property(); + property.key = SAFEMODE; + property.value = StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"); + props.add(property); + version = new PropertyList(); version.properties = props; } @@ -115,6 +127,34 @@ public class AdminResource { } /** + * Set safemode for falcon server. + * + * @param mode Set safemode to true/false based on mode. + * @return Configuration information of the server. + */ + @GET + @Path("setSafeMode/{mode}") + @Produces({MediaType.TEXT_XML, MediaType.APPLICATION_JSON}) + public String setSafeMode(@PathParam("mode") String mode) { + LOG.info("Setting falcon server safemode property to: {}", mode); + try { + if ("true".equalsIgnoreCase(mode)) { + StartupProperties.createSafemodeFile(); + } else if ("false".equalsIgnoreCase(mode)) { + StartupProperties.deleteSafemodeFile(); + } else { + LOG.error("Bad request, Invalid value for setsafemode : {}", mode); + throw FalconWebException.newAPIException("Invalid value \"" + mode + "\" provided for safemode.", + Response.Status.BAD_REQUEST); + } + } catch (IOException e) { + LOG.error("Unable to manage safemode file in Falcon Server {} ", e.getMessage()); + throw FalconWebException.newAPIException(e.getMessage(), Response.Status.BAD_REQUEST); + } + return StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"); + } + + /** * Get configuration information of the falcon server. * @param type config-type can be build, deploy, startup or runtime * @return Configuration information of the server. http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java ---------------------------------------------------------------------- diff --git a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java b/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java index ea093c7..7447adf 100644 --- a/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java +++ b/prism/src/test/java/org/apache/falcon/resource/admin/AdminResourceTest.java @@ -38,20 +38,34 @@ public class AdminResourceTest { @Test public void testAdminVersion() throws Exception { + checkProperty("authentication", "simple"); + StartupProperties.get().setProperty("falcon.authentication.type", "kerberos"); + checkProperty("authentication", "kerberos"); + StartupProperties.get().setProperty("falcon.authentication.type", "simple"); + } + + @Test + public void testSetSafemode() throws Exception { + checkProperty(AdminResource.SAFEMODE, "false"); + AdminResource resource = new AdminResource(); - AdminResource.PropertyList propertyList = resource.getVersion(); - for(AdminResource.Property property : propertyList.properties) { - if (property.key.equalsIgnoreCase("authentication")) { - Assert.assertEquals(property.value, "simple"); - } - } + String safemode = resource.setSafeMode("true"); + Assert.assertEquals(safemode, "true"); + Assert.assertTrue(StartupProperties.doesSafemodeFileExist()); + checkProperty(AdminResource.SAFEMODE, "true"); - StartupProperties.get().setProperty("falcon.authentication.type", "kerberos"); - resource = new AdminResource(); - propertyList = resource.getVersion(); + safemode = resource.setSafeMode("false"); + Assert.assertEquals(safemode, "false"); + Assert.assertFalse(StartupProperties.doesSafemodeFileExist()); + checkProperty(AdminResource.SAFEMODE, "false"); + } + + private void checkProperty(String propertyName, String expectedVal) { + AdminResource resource = new AdminResource(); + AdminResource.PropertyList propertyList = resource.getVersion(); for(AdminResource.Property property : propertyList.properties) { - if (property.key.equalsIgnoreCase("authentication")) { - Assert.assertEquals(property.value, "kerberos"); + if (property.key.equalsIgnoreCase(propertyName)) { + Assert.assertEquals(property.value, expectedVal); } } } http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java ---------------------------------------------------------------------- diff --git a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java index 047fa0f..98db379 100644 --- a/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java +++ b/rerun/src/main/java/org/apache/falcon/rerun/handler/LateRerunConsumer.java @@ -25,9 +25,10 @@ import org.apache.falcon.entity.v0.Entity; import org.apache.falcon.entity.v0.SchemaHelper; import org.apache.falcon.entity.v0.process.LateInput; import org.apache.falcon.hadoop.HadoopClientFactory; -import org.apache.falcon.workflow.LateDataHandler; import org.apache.falcon.rerun.event.LaterunEvent; import org.apache.falcon.rerun.queue.DelayedQueue; +import org.apache.falcon.util.StartupProperties; +import org.apache.falcon.workflow.LateDataHandler; import org.apache.falcon.workflow.WorkflowExecutionArgs; import org.apache.falcon.workflow.engine.AbstractWorkflowEngine; import org.apache.hadoop.conf.Configuration; @@ -58,7 +59,7 @@ public class LateRerunConsumer<T extends LateRerunHandler<DelayedQueue<LaterunEv LaterunEvent message, String entityType, String entityName) { try { if (jobStatus.equals("RUNNING") || jobStatus.equals("PREP") - || jobStatus.equals("SUSPENDED")) { + || jobStatus.equals("SUSPENDED") || StartupProperties.isServerInSafeMode()) { LOG.debug("Re-enqueing message in LateRerunHandler for workflow with same delay as " + "job status is {} for : {}", jobStatus, message.getWfId()); message.setMsgInsertTime(System.currentTimeMillis()); http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/src/bin/service-start.sh ---------------------------------------------------------------------- diff --git a/src/bin/service-start.sh b/src/bin/service-start.sh index 4766130..f11dce0 100755 --- a/src/bin/service-start.sh +++ b/src/bin/service-start.sh @@ -13,6 +13,17 @@ # limitations under the License. See accompanying LICENSE file. # +# validate args +args=("$@") +for ((i=0; i < $#; i++)) { + if [ "-setsafemode" == "${args[$i]}" ]; then + if [ "false" != "${args[$i+1]}" ] && [ "true" != "${args[$i+1]}" ]; then + echo "Invalid argument for option -safemode. Acceptable values are true or false." + exit 1 + fi + fi +} + # resolve links - $0 may be a softlink PRG="${0}" http://git-wip-us.apache.org/repos/asf/falcon/blob/2d51db7a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java ---------------------------------------------------------------------- diff --git a/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java new file mode 100644 index 0000000..f640a69 --- /dev/null +++ b/webapp/src/test/java/org/apache/falcon/cli/FalconSafemodeCLIIT.java @@ -0,0 +1,183 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.falcon.cli; + +import org.apache.falcon.entity.v0.SchemaHelper; +import org.apache.falcon.resource.TestContext; +import org.apache.falcon.util.FalconTestUtil; +import org.apache.falcon.util.StartupProperties; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import java.io.OutputStream; +import java.io.PrintStream; +import java.util.Date; +import java.util.Map; + +/** + * Test for Falcon CLI. + */ +@Test(groups = {"exhaustive"}) +public class FalconSafemodeCLIIT { + private InMemoryWriter stream = new InMemoryWriter(System.out); + private TestContext context = new TestContext(); + private Map<String, String> overlay; + private static final String START_INSTANCE = "2012-04-20T00:00Z"; + + @BeforeClass + public void prepare() throws Exception { + TestContext.prepare(); + FalconCLI.OUT.set(stream); + initSafemode(); + } + + @AfterClass + public void tearDown() throws Exception { + clearSafemode(); + TestContext.deleteEntitiesFromStore(); + + } + + private void initSafemode() throws Exception { + overlay = context.getUniqueOverlay(); + // Submit one cluster + String filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), 0); + context.setCluster(overlay.get("cluster")); + Assert.assertEquals(stream.buffer.toString().trim(), + "falcon/default/Submit successful (cluster) " + context.getClusterName()); + + // Submit and schedule one feed + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE1, overlay); + Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type feed -file " + filePath), 0); + + // Schedule the feed + Assert.assertEquals(executeWithURL("entity -status -type feed -name " + overlay.get("inputFeedName")), 0); + + // Test the lookup command + Assert.assertEquals(executeWithURL("entity -lookup -type feed -path " + + "/falcon/test/input/2014/11/23/23"), 0); + + // Set safemode + Assert.assertEquals(new FalconCLI().run(("admin -setsafemode true -url " + + TestContext.BASE_URL).split("\\s")), 0); + } + + private void clearSafemode() throws Exception { + Assert.assertEquals(new FalconCLI().run(("admin -setsafemode false -url " + + TestContext.BASE_URL).split("\\s")), 0); + Assert.assertEquals(StartupProperties.get().getProperty(StartupProperties.SAFEMODE_PROPERTY, "false"), + "false"); + } + + public void testEntityCommandsNotAllowedInSafeMode() throws Exception { + String filePath; + + filePath = TestContext.overlayParametersOverTemplate(TestContext.CLUSTER_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type cluster -file " + filePath), -1); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type feed -file " + filePath), -1); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -submit -type process -doAs " + FalconTestUtil.TEST_USER_2 + + " -file " + filePath), -1); + + filePath = TestContext.overlayParametersOverTemplate(context.getClusterFileTemplate(), overlay); + Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type cluster -file " + filePath), -1); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.FEED_TEMPLATE2, overlay); + Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type feed -doAs " + FalconTestUtil.TEST_USER_2 + + " -file " + filePath), -1); + + filePath = TestContext.overlayParametersOverTemplate(TestContext.PROCESS_TEMPLATE, overlay); + Assert.assertEquals(executeWithURL("entity -submitAndSchedule -type process -file " + filePath), -1); + + Assert.assertEquals(executeWithURL("entity -update -name " + overlay.get("processName") + + " -type process -file " + filePath), -1); + + Assert.assertEquals(executeWithURL("entity -touch -name " + overlay.get("processName") + + " -type process"), -1); + + Assert.assertEquals(executeWithURL("entity -schedule -type feed " + + " -name " + overlay.get("inputFeedName")), -1); + + Assert.assertEquals(executeWithURL("entity -resume -type feed " + + " -name " + overlay.get("inputFeedName")), -1); + + Assert.assertEquals(executeWithURL("entity -delete -type feed -name " + overlay.get("inputFeedName")), -1); + + + } + + public void testEntityCommandsAllowedInSafeMode() throws Exception { + // Allow definition, summary, list, suspend operations + Assert.assertEquals(executeWithURL("entity -definition -type cluster -name " + overlay.get("cluster")), 0); + + Assert.assertEquals(executeWithURL("entity -suspend -type feed " + + " -name " + overlay.get("inputFeedName")), 0); + + Assert.assertEquals(executeWithURL("entity -summary -type feed -cluster "+ overlay.get("cluster") + + " -fields status,tags -start " + START_INSTANCE + + " -filterBy TYPE:FEED -orderBy name -sortOrder asc " + + " -offset 0 -numResults 1 -numInstances 5"), 0); + + Assert.assertEquals(executeWithURL("instance -list -type feed " + + " -name " + overlay.get("inputFeedName") + " -start " + + SchemaHelper.getDateFormat().format(new Date())), 0); + + Assert.assertEquals(executeWithURL("instance -kill -type feed -name " + + overlay.get("inputFeedName") + + " -start " + START_INSTANCE + " -end " + START_INSTANCE), 0); + + } + + private int executeWithURL(String command) throws Exception { + FalconCLI.OUT.get().print("COMMAND IS "+command + " -url " + TestContext.BASE_URL + "\n"); + return new FalconCLI() + .run((command + " -url " + TestContext.BASE_URL).split("\\s+")); + } + + private static class InMemoryWriter extends PrintStream { + + private StringBuffer buffer = new StringBuffer(); + + public InMemoryWriter(OutputStream out) { + super(out); + } + + @Override + public void println(String x) { + clear(); + buffer.append(x); + super.println(x); + } + + @SuppressWarnings("UnusedDeclaration") + public String getBuffer() { + return buffer.toString(); + } + + public void clear() { + buffer.delete(0, buffer.length()); + } + } +}
