Repository: incubator-gobblin Updated Branches: refs/heads/master 0e5561519 -> c385f1ddd
[GOBBLIN-304] Change default version of flow specs to null. Closes #2159 from arjun4084346/removeSpecVersion Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c385f1dd Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c385f1dd Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c385f1dd Branch: refs/heads/master Commit: c385f1dddb0124f0ff2545d9a591f3b055f76080 Parents: 0e55615 Author: Arjun <[email protected]> Authored: Tue Nov 7 14:04:53 2017 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Nov 7 14:04:53 2017 -0800 ---------------------------------------------------------------------- .../apache/gobblin/service/FlowConfigTest.java | 12 +-- .../gobblin/service/FlowConfigsResource.java | 40 +-------- .../apache/gobblin/runtime/api/FlowSpec.java | 7 +- .../runtime/spec_catalog/FlowCatalog.java | 17 ++-- .../runtime/spec_catalog/TopologyCatalog.java | 15 +--- .../gobblin/runtime/spec_store/FSSpecStore.java | 92 +++++--------------- .../gobblin/spec_catalog/FlowCatalogTest.java | 7 +- .../spec_catalog/TopologyCatalogTest.java | 3 +- .../service/modules/core/GitConfigMonitor.java | 7 +- .../modules/core/GobblinServiceHATest.java | 12 +-- .../modules/core/GobblinServiceManagerTest.java | 25 ++---- .../core/IdentityFlowToJobSpecCompilerTest.java | 4 +- .../MultiHopsFlowToJobSpecCompilerTest.java | 4 +- .../modules/orchestration/OrchestratorTest.java | 2 +- 14 files changed, 70 insertions(+), 177 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java index 9ac7d37..a373762 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-client/src/test/java/org/apache/gobblin/service/FlowConfigTest.java @@ -171,11 +171,8 @@ public class FlowConfigTest { try { _client.createFlowConfig(flowConfig); } catch (RestLiResponseException e) { - Assert.assertEquals(e.getStatus(), HttpStatus.S_409_CONFLICT.getCode()); - return; + Assert.fail("Create Again should pass without complaining that the spec already exists."); } - - Assert.fail("Get should have gotten a 409 error"); } @Test (dependsOnMethods = "testCreateAgain") @@ -187,7 +184,7 @@ public class FlowConfigTest { Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME); Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE ); Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI); - Assert.assertTrue(flowConfig.getSchedule().isRunImmediately()); + Assert.assertFalse(flowConfig.getSchedule().isRunImmediately()); // Add this asssert back when getFlowSpec() is changed to return the raw flow spec //Assert.assertEquals(flowConfig.getProperties().size(), 1); Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1"); @@ -282,11 +279,8 @@ public class FlowConfigTest { try { _client.updateFlowConfig(flowConfig); } catch (RestLiResponseException e) { - Assert.assertEquals(e.getStatus(), HttpStatus.S_404_NOT_FOUND.getCode()); - return; + Assert.fail("Bad update should pass without complaining that the spec does not exists."); } - - Assert.fail("Get should have raised a 404 error"); } @AfterClass(alwaysRun = true) http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java index a99087c..f0bce17 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java @@ -123,7 +123,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt flowConfig.setSchedule(schedule); } - // remove keys that were injected as part of flowSpec creation flowProps.remove(ConfigurationKeys.JOB_SCHEDULE_KEY); flowProps.remove(ConfigurationKeys.JOB_TEMPLATE_PATH); @@ -180,26 +179,13 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt public CreateResponse create(FlowConfig flowConfig) { LOG.info("Create called with flowName " + flowConfig.getId().getFlowName()); - LOG.info("ReadyToUse is: " + readyToUse); - LOG.info("FlowCatalog is: " + getFlowCatalog()); + LOG.debug("ReadyToUse is: " + readyToUse); + LOG.debug("FlowCatalog is: " + getFlowCatalog()); if (!readyToUse && getFlowCatalog() == null) { throw new RuntimeException("Not ready for use."); } - try { - URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null); - URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), - "/" + flowConfig.getId().getFlowGroup() + "/" + flowConfig.getId().getFlowName(), null, null); - - if (getFlowCatalog().exists(flowUri)) { - logAndThrowRestLiServiceException(HttpStatus.S_409_CONFLICT, - "Flow with the same name already exists: " + flowUri, null); - } - } catch (URISyntaxException e) { - logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getId().getFlowName(), e); - } - getFlowCatalog().put(createFlowSpecForConfig(flowConfig)); return new CreateResponse(new ComplexResourceKey<>(flowConfig.getId(), new EmptyRecord()), HttpStatus.S_201_CREATED); @@ -216,7 +202,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt public UpdateResponse update(ComplexResourceKey<FlowId, EmptyRecord> key, FlowConfig flowConfig) { String flowGroup = key.getKey().getFlowGroup(); String flowName = key.getKey().getFlowName(); - URI flowUri = null; LOG.info("Update called with flowGroup " + flowGroup + " flowName " + flowName); @@ -225,25 +210,9 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt "flowName and flowGroup cannot be changed in update", null); } - try { - URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null); - flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), - "/" + flowGroup + "/" + flowName, null, null); - if (!getFlowCatalog().exists(flowUri)) { - logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, - "Flow does not exist: flowGroup " + flowGroup + " flowName " + flowName, null); - } - - FlowSpec newFlowSpec = createFlowSpecForConfig(flowConfig); - - getFlowCatalog().put(newFlowSpec); + getFlowCatalog().put(createFlowSpecForConfig(flowConfig)); return new UpdateResponse(HttpStatus.S_200_OK); - } catch (URISyntaxException e) { - logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e); - } - - return null; } /** @@ -270,9 +239,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt return new UpdateResponse(HttpStatus.S_200_OK); } catch (URISyntaxException e) { logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e); - } catch (SpecNotFoundException e) { - logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Flow does not exist: flowGroup " + flowGroup + - " flowName " + flowName, null); } return null; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java index e7c08d2..59a5025 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/FlowSpec.java @@ -126,16 +126,17 @@ public class FlowSpec implements Configurable, Spec { * <li> Default flowCatalogURI is {@link #DEFAULT_FLOW_CATALOG_SCHEME}: * <li> Convention for FlowSpec URI: <flowCatalogURI>/config.get({@link ConfigurationKeys#FLOW_GROUP_KEY})/config.get({@link ConfigurationKeys#FLOW_NAME_KEY}) * <li> Convention for Description: config.get({@link ConfigurationKeys#FLOW_DESCRIPTION_KEY}) - * <li> Default version: 1 + * <li> Default version: empty * </ul> */ public static class Builder { public static final String DEFAULT_FLOW_CATALOG_SCHEME = "gobblin-flow"; + public static final String DEFAULT_VERSION = ""; @VisibleForTesting private Optional<Config> config = Optional.absent(); private Optional<Properties> configAsProperties = Optional.absent(); private Optional<URI> uri; - private String version = "1"; + private String version = FlowSpec.Builder.DEFAULT_VERSION; private Optional<String> description = Optional.absent(); private Optional<URI> flowCatalogURI = Optional.absent(); private Optional<Set<URI>> templateURIs = Optional.absent(); @@ -163,7 +164,7 @@ public class FlowSpec implements Configurable, Spec { public FlowSpec build() { Preconditions.checkNotNull(this.uri); - Preconditions.checkNotNull(this.version); + Preconditions.checkArgument(null != version, "Version should not be null"); return new FlowSpec(getURI(), getVersion(), getDescription(), getConfig(), getConfigAsProperties(), getTemplateURIs(), getChildSpecs()); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 1cb09da..ecfe036 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -234,29 +234,22 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut log.info(String.format("Adding FlowSpec with URI: %s and Config: %s", spec.getUri(), ((FlowSpec) spec).getConfigAsProperties())); - if (specStore.exists(spec.getUri())) { - specStore.updateSpec(spec); - this.listeners.onUpdateSpec(spec); - } else { - specStore.addSpec(spec); - this.listeners.onAddSpec(spec); - } - - } catch (IOException | SpecNotFoundException e) { + specStore.addSpec(spec); + this.listeners.onAddSpec(spec); + } catch (IOException e) { throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e); } } @Override - public void remove(URI uri) throws SpecNotFoundException { + public void remove(URI uri) { try { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(uri); log.info(String.format("Removing FlowSpec with URI: %s", uri)); - Spec spec = specStore.getSpec(uri); - this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion()); specStore.deleteSpec(uri); + this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION); } catch (IOException e) { throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java index c6e02d2..7bb8b9c 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/TopologyCatalog.java @@ -30,6 +30,7 @@ import lombok.Getter; import org.apache.commons.lang3.SerializationUtils; import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.gobblin.runtime.api.FlowSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -232,15 +233,9 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, log.info(String.format("Adding TopologySpec with URI: %s and Config: %s", spec.getUri(), ((TopologySpec) spec).getConfigAsProperties())); - if (specStore.exists(spec.getUri())) { - specStore.updateSpec(spec); - this.listeners.onUpdateSpec(spec); - } else { specStore.addSpec(spec); this.listeners.onAddSpec(spec); - } - - } catch (IOException | SpecNotFoundException e) { + } catch (IOException e) { throw new RuntimeException("Cannot add Spec to Spec store: " + spec, e); } } @@ -252,11 +247,9 @@ public class TopologyCatalog extends AbstractIdleService implements SpecCatalog, Preconditions.checkNotNull(uri); log.info(String.format("Removing TopologySpec with URI: %s", uri)); - Spec spec = specStore.getSpec(uri); - this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion()); + this.listeners.onDeleteSpec(uri, FlowSpec.Builder.DEFAULT_VERSION); specStore.deleteSpec(uri); - - } catch (IOException | SpecNotFoundException e) { + } catch (IOException e) { throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java index b283c87..d87d6d4 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java @@ -25,6 +25,7 @@ import java.net.URI; import java.util.Collection; import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; @@ -114,62 +115,36 @@ public class FSSpecStore implements SpecStore { return Files.getNameWithoutExtension(specUri.getName()); } - private Collection<Spec> getAllVersionsOfSpec(String specGroup, String specName) throws IOException { + private Collection<Spec> getAllVersionsOfSpec(Path spec) { Collection<Spec> specs = Lists.newArrayList(); - FileStatus[] fileStatuses; - try { - fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup); - } catch (FileNotFoundException e) { - return specs; - } - for (FileStatus fileStatus : fileStatuses) { - if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) { - specs.add(readSpecFromFile(fileStatus.getPath())); - } + try { + specs.add(readSpecFromFile(spec)); + } catch (IOException e) { + log.warn("Spec {} not found.", spec); } return specs; } + /** + * Returns all versions of the spec defined by specUri. + * Currently, multiple versions are not supported, so this should return exactly one spec. + * @param specUri URI for the {@link Spec} to be retrieved. + * @return all versions of the spec. + */ @Override - public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException { + public Collection<Spec> getAllVersionsOfSpec(URI specUri) { Preconditions.checkArgument(null != specUri, "Spec URI should not be null"); - Path specPath = new Path(specUri.getPath()); - return getAllVersionsOfSpec(getSpecGroup(specPath), getSpecName(specPath)); + Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, FlowSpec.Builder.DEFAULT_VERSION); + return getAllVersionsOfSpec(specPath); } @Override public boolean exists(URI specUri) throws IOException { Preconditions.checkArgument(null != specUri, "Spec URI should not be null"); - Path flowPath = new Path(specUri.getPath()); - String specGroup = getSpecGroup(flowPath); - String specName = getSpecName(flowPath); - FileStatus[] fileStatuses; - try { - fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup); - } catch (FileNotFoundException e) { - return false; - } - // TODO Fix ETL-6496 - // We need to revisit having a version delimiter. - // Currently without a delimiter the prefix check may match other specs that should not be matched. - for (FileStatus fileStatus : fileStatuses) { - if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) { - return true; - } - } - return false; - } - - private FileStatus[] listSpecs(Path fsSpecStoreDirPath, String specGroup) throws FileNotFoundException, IOException { - FileStatus[] fileStatuses; - if (StringUtils.isEmpty(specGroup)) { - fileStatuses = fs.listStatus(fsSpecStoreDirPath); - } else { - fileStatuses = fs.listStatus(new Path(fsSpecStoreDirPath, specGroup)); - } - return fileStatuses; + Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, FlowSpec.Builder.DEFAULT_VERSION); + return fs.exists(specPath); } @Override @@ -192,11 +167,7 @@ public class FSSpecStore implements SpecStore { public boolean deleteSpec(URI specUri) throws IOException { Preconditions.checkArgument(null != specUri, "Spec URI should not be null"); - try { - return deleteSpec(specUri, getSpec(specUri).getVersion()); - } catch (SpecNotFoundException e) { - throw new IOException(String.format("Issue in removing Spec: %s", specUri), e); - } + return deleteSpec(specUri, FlowSpec.Builder.DEFAULT_VERSION); } @Override @@ -207,13 +178,7 @@ public class FSSpecStore implements SpecStore { try { log.info(String.format("Deleting Spec with URI: %s in FSSpecStore: %s", specUri, this.fsSpecStoreDirPath)); Path specPath = getPathForURI(this.fsSpecStoreDirPath, specUri, version); - - if (fs.exists(specPath)) { - return fs.delete(specPath, false); - } else { - log.warn("No file with URI:" + specUri + " is found. Deletion failed."); - return false; - } + return fs.delete(specPath, false); } catch (IOException e) { throw new IOException(String.format("Issue in removing Spec: %s for Version: %s", specUri, version), e); } @@ -221,17 +186,12 @@ public class FSSpecStore implements SpecStore { @Override public Spec updateSpec(Spec spec) throws IOException, SpecNotFoundException { - Preconditions.checkArgument(null != spec, "Spec should not be null"); - - log.info(String.format("Updating Spec with URI: %s in FSSpecStore: %s", spec.getUri(), this.fsSpecStoreDirPath)); - Path specPath = getPathForURI(this.fsSpecStoreDirPath, spec.getUri(), spec.getVersion()); - writeSpecToFile(specPath, spec); - + addSpec(spec); return spec; } @Override - public Spec getSpec(URI specUri) throws IOException, SpecNotFoundException { + public Spec getSpec(URI specUri) throws SpecNotFoundException { Preconditions.checkArgument(null != specUri, "Spec URI should not be null"); Collection<Spec> specs = getAllVersionsOfSpec(specUri); @@ -296,9 +256,9 @@ public class FSSpecStore implements SpecStore { * @throws IOException */ protected Spec readSpecFromFile(Path path) throws IOException { - Spec spec = null; + Spec spec; - try (FSDataInputStream fis = fs.open(path);) { + try (FSDataInputStream fis = fs.open(path)) { spec = this.specSerDe.deserialize(ByteStreams.toByteArray(fis)); } @@ -312,12 +272,8 @@ public class FSSpecStore implements SpecStore { * @throws IOException */ protected void writeSpecToFile(Path specPath, Spec spec) throws IOException { - if (fs.exists(specPath)) { - fs.delete(specPath, true); - } - byte[] serializedSpec = this.specSerDe.serialize(spec); - try (FSDataOutputStream os = fs.create(specPath)) { + try (FSDataOutputStream os = fs.create(specPath, true)) { os.write(serializedSpec); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java index ae2e087..537cdbe 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java @@ -56,7 +56,7 @@ public class FlowCatalogTest { private static final String SPEC_STORE_DIR = "/tmp/flowTestSpecStore"; private static final String SPEC_GROUP_DIR = "/tmp/flowTestSpecStore/flowTestGroupDir"; private static final String SPEC_DESCRIPTION = "Test Flow Spec"; - private static final String SPEC_VERSION = "1"; + private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION; private ServiceBasedAppLauncher serviceLauncher; private FlowCatalog flowCatalog; @@ -144,6 +144,11 @@ public class FlowCatalogTest { } @Test (dependsOnMethods = "createFlowSpec") + void testExist() throws Exception { + Assert.assertTrue(flowCatalog.exists(flowSpec.getUri())); + } + + @Test (dependsOnMethods = "testExist") public void deleteFlowSpec() throws SpecNotFoundException { // List Current Specs Collection<Spec> specs = flowCatalog.getSpecs(); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java index 48fba40..a3490ff 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Properties; import org.apache.commons.io.FileUtils; +import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; @@ -54,7 +55,7 @@ public class TopologyCatalogTest { private static final String SPEC_STORE_PARENT_DIR = "/tmp"; private static final String SPEC_STORE_DIR = "/tmp/topologyTestSpecStore"; private static final String SPEC_DESCRIPTION = "Test Topology Spec"; - private static final String SPEC_VERSION = "1"; + private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION; private ServiceBasedAppLauncher serviceLauncher; private TopologyCatalog topologyCatalog; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java index 00f8fc2..b20e3b7 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java @@ -72,7 +72,7 @@ import lombok.extern.slf4j.Slf4j; @Slf4j public class GitConfigMonitor extends AbstractIdleService { private static final String SPEC_DESCRIPTION = "Git-based flow config"; - private static final String SPEC_VERSION = "1"; + private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION; private static final int TERMINATION_TIMEOUT = 30; private static final int CONFIG_FILE_DEPTH = 3; private static final String REMOTE_NAME = "origin"; @@ -251,12 +251,7 @@ public class GitConfigMonitor extends AbstractIdleService { .withDescription(SPEC_DESCRIPTION) .build(); - try { this.flowCatalog.remove(spec.getUri()); - } catch (SpecNotFoundException e) { - // okay if flow does not exist - log.warn("Flow {} does not exist.", spec.getUri()); - } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java index 289e212..ad4180a 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceHATest.java @@ -301,16 +301,14 @@ public class GobblinServiceHATest { // Try create on both nodes try { this.node1FlowConfigClient.createFlowConfig(flowConfig1); - Assert.fail("Get should have gotten a 409 error"); } catch (RestLiResponseException e) { - Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409); + Assert.fail("Create Again should pass without complaining that the spec already exists."); } try { this.node2FlowConfigClient.createFlowConfig(flowConfig2); - Assert.fail("Get should have gotten a 409 error"); } catch (RestLiResponseException e) { - Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409); + Assert.fail("Create Again should pass without complaining that the spec already exists."); } } @@ -442,16 +440,14 @@ public class GobblinServiceHATest { try { this.node1FlowConfigClient.updateFlowConfig(flowConfig); - Assert.fail("Get should have raised a 404 error"); } catch (RestLiResponseException e) { - Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404); + Assert.fail("Bad update should pass without complaining that the spec does not exists."); } try { this.node2FlowConfigClient.updateFlowConfig(flowConfig); - Assert.fail("Get should have raised a 404 error"); } catch (RestLiResponseException e) { - Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404); + Assert.fail("Bad update should pass without complaining that the spec does not exists."); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java index b40792e..926dd10 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/GobblinServiceManagerTest.java @@ -66,15 +66,14 @@ public class GobblinServiceManagerTest { private static final String SERVICE_WORK_DIR = "/tmp/serviceWorkDir/"; private static final String SPEC_STORE_PARENT_DIR = "/tmp/serviceCore/"; private static final String SPEC_DESCRIPTION = "Test ServiceCore"; - private static final String SPEC_VERSION = "1"; private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/serviceCore/topologyTestSpecStore"; private static final String FLOW_SPEC_STORE_DIR = "/tmp/serviceCore/flowTestSpecStore"; private static final String GIT_CLONE_DIR = "/tmp/serviceCore/clone"; private static final String GIT_REMOTE_REPO_DIR = "/tmp/serviceCore/remote"; private static final String GIT_LOCAL_REPO_DIR = "/tmp/serviceCore/local"; - private static final String TEST_GROUP_NAME = "testGroup1"; - private static final String TEST_FLOW_NAME = "testFlow1"; + private static final String TEST_GROUP_NAME = "testGroup"; + private static final String TEST_FLOW_NAME = "testFlow"; private static final String TEST_SCHEDULE = "0 1/0 * ? * *"; private static final String TEST_TEMPLATE_URI = "FS:///templates/test.template"; private static final String TEST_DUMMY_GROUP_NAME = "dummyGroup"; @@ -100,7 +99,7 @@ public class GobblinServiceManagerTest { serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".description", "StandaloneTestExecutor"); serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".version", - "1"); + FlowSpec.Builder.DEFAULT_VERSION); serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".uri", "gobblinExecutor"); serviceCoreProperties.put(ServiceConfigKeys.TOPOLOGY_FACTORY_PREFIX + TEST_GOBBLIN_EXECUTOR_NAME + ".specExecutorInstance", @@ -168,8 +167,7 @@ public class GobblinServiceManagerTest { flowProperties.put(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY, TEST_SINK_NAME); FlowConfig flowConfig = new FlowConfig().setId(new FlowId().setFlowGroup(TEST_GROUP_NAME).setFlowName(TEST_FLOW_NAME)) - .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE). - setRunImmediately(true)) + .setTemplateUris(TEST_TEMPLATE_URI).setSchedule(new Schedule().setCronSchedule(TEST_SCHEDULE).setRunImmediately(true)) .setProperties(new StringMap(flowProperties)); this.flowConfigClient.createFlowConfig(flowConfig); @@ -191,11 +189,8 @@ public class GobblinServiceManagerTest { try { this.flowConfigClient.createFlowConfig(flowConfig); } catch (RestLiResponseException e) { - Assert.assertEquals(e.getStatus(), HttpStatus.CONFLICT_409); - return; + Assert.fail("Create Again should pass without complaining that the spec already exists."); } - - Assert.fail("Get should have gotten a 409 error"); } @Test (dependsOnMethods = "testCreateAgain") @@ -207,8 +202,8 @@ public class GobblinServiceManagerTest { Assert.assertEquals(flowConfig.getId().getFlowName(), TEST_FLOW_NAME); Assert.assertEquals(flowConfig.getSchedule().getCronSchedule(), TEST_SCHEDULE ); Assert.assertEquals(flowConfig.getTemplateUris(), TEST_TEMPLATE_URI); - Assert.assertTrue(flowConfig.getSchedule().isRunImmediately()); - // Add this asssert back when getFlowSpec() is changed to return the raw flow spec + Assert.assertFalse(flowConfig.getSchedule().isRunImmediately()); + // Add this assert back when getFlowSpec() is changed to return the raw flow spec //Assert.assertEquals(flowConfig.getProperties().size(), 1); Assert.assertEquals(flowConfig.getProperties().get("param1"), "value1"); } @@ -333,10 +328,8 @@ public class GobblinServiceManagerTest { try { this.flowConfigClient.updateFlowConfig(flowConfig); } catch (RestLiResponseException e) { - Assert.assertEquals(e.getStatus(), HttpStatus.NOT_FOUND_404); - return; + Assert.fail("Bad update should pass without complaining that the spec does not exists."); } - - Assert.fail("Get should have raised a 404 error"); + cleanUpDir(FLOW_SPEC_STORE_DIR); } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java index 864b238..2dbe790 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/IdentityFlowToJobSpecCompilerTest.java @@ -65,7 +65,7 @@ public class IdentityFlowToJobSpecCompilerTest { private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/"; private static final String SPEC_DESCRIPTION = "Test Orchestrator"; - private static final String SPEC_VERSION = "1"; + private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION; private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis(); private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis(); @@ -145,7 +145,7 @@ public class IdentityFlowToJobSpecCompilerTest { FLOW_SPEC_STORE_DIR)) .withConfig(config) .withDescription("dummy description") - .withVersion("1") + .withVersion(SPEC_VERSION) .withTemplate(new URI(TEST_TEMPLATE_URI)); } catch (URISyntaxException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java index cc722eb..8b08628 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/core/MultiHopsFlowToJobSpecCompilerTest.java @@ -76,7 +76,7 @@ public class MultiHopsFlowToJobSpecCompilerTest { private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/"; private static final String SPEC_DESCRIPTION = "Test Orchestrator"; - private static final String SPEC_VERSION = "1"; + private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION; private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis(); private static final String TOPOLOGY_SPEC_STORE_DIR_SECOND = "/tmp/orchestrator/topologyTestSpecStore_" + System.currentTimeMillis() + "_2"; private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore_" + System.currentTimeMillis(); @@ -288,7 +288,7 @@ public class MultiHopsFlowToJobSpecCompilerTest { FLOW_SPEC_STORE_DIR)) .withConfig(config) .withDescription("dummy description") - .withVersion("1") + .withVersion(SPEC_VERSION) .withTemplate(new URI(TEST_TEMPLATE_URI)); } catch (URISyntaxException e) { throw new RuntimeException(e); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c385f1dd/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index 8896068..6d75f9e 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -57,7 +57,7 @@ public class OrchestratorTest { private static final String SPEC_STORE_PARENT_DIR = "/tmp/orchestrator/"; private static final String SPEC_DESCRIPTION = "Test Orchestrator"; - private static final String SPEC_VERSION = "1"; + private static final String SPEC_VERSION = FlowSpec.Builder.DEFAULT_VERSION; private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore"; private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore"; private static final String FLOW_SPEC_GROUP_DIR = "/tmp/orchestrator/flowTestSpecStore/flowTestGroupDir";
