Repository: incubator-gobblin Updated Branches: refs/heads/master f3eadceed -> 2d05b03d5
[GOBBLIN-291] Remove unnecessary spec list and read resolve review comments Closes #2147 from arjun4084346/flowDelay Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/2d05b03d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/2d05b03d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/2d05b03d Branch: refs/heads/master Commit: 2d05b03d5e76f9e4056b2adc321ff4b3ef778dc5 Parents: f3eadce Author: Arjun <[email protected]> Authored: Wed Oct 25 09:29:11 2017 -0700 Committer: Hung Tran <[email protected]> Committed: Wed Oct 25 09:29:11 2017 -0700 ---------------------------------------------------------------------- .../gobblin/service/FlowConfigsResource.java | 14 ++-- .../gobblin/runtime/api/MutableSpecCatalog.java | 5 +- .../runtime/spec_catalog/FlowCatalog.java | 12 ++- .../gobblin/runtime/spec_store/FSSpecStore.java | 85 +++++++++++++++----- .../gobblin/spec_catalog/FlowCatalogTest.java | 10 ++- .../service/modules/core/GitConfigMonitor.java | 17 ++-- .../modules/orchestration/OrchestratorTest.java | 3 +- 7 files changed, 102 insertions(+), 44 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java ---------------------------------------------------------------------- diff --git a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java index 3159d49..a99087c 100644 --- a/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java +++ b/gobblin-restli/gobblin-flow-config-service/gobblin-flow-config-service-server/src/main/java/org/apache/gobblin/service/FlowConfigsResource.java @@ -192,14 +192,12 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt URI flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), "/" + flowConfig.getId().getFlowGroup() + "/" + flowConfig.getId().getFlowName(), null, null); - if (getFlowCatalog().getSpec(flowUri) != null) { + if (getFlowCatalog().exists(flowUri)) { logAndThrowRestLiServiceException(HttpStatus.S_409_CONFLICT, "Flow with the same name already exists: " + flowUri, null); } } catch (URISyntaxException e) { logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowConfig.getId().getFlowName(), e); - } catch (SpecNotFoundException e) { - // okay if flow does not exist } getFlowCatalog().put(createFlowSpecForConfig(flowConfig)); @@ -231,7 +229,11 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null); flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), "/" + flowGroup + "/" + flowName, null, null); - FlowSpec oldFlowSpec = (FlowSpec) getFlowCatalog().getSpec(flowUri); + if (!getFlowCatalog().exists(flowUri)) { + logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, + "Flow does not exist: flowGroup " + flowGroup + " flowName " + flowName, null); + } + FlowSpec newFlowSpec = createFlowSpecForConfig(flowConfig); getFlowCatalog().put(newFlowSpec); @@ -239,9 +241,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt return new UpdateResponse(HttpStatus.S_200_OK); } catch (URISyntaxException e) { logAndThrowRestLiServiceException(HttpStatus.S_400_BAD_REQUEST, "bad URI " + flowUri, e); - } catch (SpecNotFoundException e) { - logAndThrowRestLiServiceException(HttpStatus.S_404_NOT_FOUND, "Flow does not exist: flowGroup " + flowGroup + - " flowName " + flowName, null); } return null; @@ -265,7 +264,6 @@ public class FlowConfigsResource extends ComplexKeyResourceTemplate<FlowId, Empt URI flowCatalogURI = new URI("gobblin-flow", null, "/", null, null); flowUri = new URI(flowCatalogURI.getScheme(), flowCatalogURI.getAuthority(), "/" + flowGroup + "/" + flowName, null, null); - FlowSpec flowSpec = (FlowSpec) getFlowCatalog().getSpec(flowUri); getFlowCatalog().remove(flowUri); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java index 1751a56..f63600a 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/api/MutableSpecCatalog.java @@ -34,7 +34,8 @@ public interface MutableSpecCatalog extends SpecCatalog { public void put(Spec spec); /** - * Removes an existing {@link Spec} with the given URI. A no-op if such {@link Spec} does not exist. + * Removes an existing {@link Spec} with the given URI. + * Throws SpecNotFoundException if such {@link Spec} does not exist. */ - void remove(URI uri); + void remove(URI uri) throws SpecNotFoundException; } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java index 8ffa4d7..1cb09da 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_catalog/FlowCatalog.java @@ -209,6 +209,14 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut } } + public boolean exists(URI uri) { + try { + return specStore.exists(uri); + } catch (IOException e) { + throw new RuntimeException("Cannot retrieve Spec from Spec store for URI: " + uri, e); + } + } + @Override public Spec getSpec(URI uri) throws SpecNotFoundException { try { @@ -240,7 +248,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut } @Override - public void remove(URI uri) { + public void remove(URI uri) throws SpecNotFoundException { try { Preconditions.checkState(state() == State.RUNNING, String.format("%s is not running.", this.getClass().getName())); Preconditions.checkNotNull(uri); @@ -250,7 +258,7 @@ public class FlowCatalog extends AbstractIdleService implements SpecCatalog, Mut this.listeners.onDeleteSpec(spec.getUri(), spec.getVersion()); specStore.deleteSpec(uri); - } catch (IOException | SpecNotFoundException e) { + } catch (IOException e) { throw new RuntimeException("Cannot delete Spec from Spec store for URI: " + uri, e); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java index 608d390..b283c87 100644 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_store/FSSpecStore.java @@ -18,6 +18,8 @@ package org.apache.gobblin.runtime.spec_store; import com.google.common.io.ByteStreams; + +import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.Collection; @@ -35,6 +37,7 @@ import org.slf4j.LoggerFactory; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Lists; +import com.google.common.io.Files; import com.typesafe.config.Config; import org.apache.gobblin.configuration.ConfigurationKeys; @@ -98,20 +101,77 @@ public class FSSpecStore implements SpecStore { } } + /** + * @param specUri path of the spec + * @return empty string for topology spec, as topolgies do not have a group, + * group name for flow spec + */ + public static String getSpecGroup(Path specUri) { + return specUri.getParent().getName(); + } + + public static String getSpecName(Path specUri) { + return Files.getNameWithoutExtension(specUri.getName()); + } + + private Collection<Spec> getAllVersionsOfSpec(String specGroup, String specName) throws IOException { + Collection<Spec> specs = Lists.newArrayList(); + FileStatus[] fileStatuses; + try { + fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup); + } catch (FileNotFoundException e) { + return specs; + } + + for (FileStatus fileStatus : fileStatuses) { + if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) { + specs.add(readSpecFromFile(fileStatus.getPath())); + } + } + return specs; + } + + @Override + public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException { + Preconditions.checkArgument(null != specUri, "Spec URI should not be null"); + Path specPath = new Path(specUri.getPath()); + return getAllVersionsOfSpec(getSpecGroup(specPath), getSpecName(specPath)); + } + @Override public boolean exists(URI specUri) throws IOException { Preconditions.checkArgument(null != specUri, "Spec URI should not be null"); + Path flowPath = new Path(specUri.getPath()); + String specGroup = getSpecGroup(flowPath); + String specName = getSpecName(flowPath); + FileStatus[] fileStatuses; + try { + fileStatuses = listSpecs(this.fsSpecStoreDirPath, specGroup); + } catch (FileNotFoundException e) { + return false; + } - FileStatus[] fileStatuses = fs.listStatus(this.fsSpecStoreDirPath); + // TODO Fix ETL-6496 + // We need to revisit having a version delimiter. + // Currently without a delimiter the prefix check may match other specs that should not be matched. for (FileStatus fileStatus : fileStatuses) { - if (StringUtils.startsWith(fileStatus.getPath().getName(), specUri.toString())) { + if (!fileStatus.isDirectory() && fileStatus.getPath().getName().startsWith(specName)) { return true; } } - return false; } + private FileStatus[] listSpecs(Path fsSpecStoreDirPath, String specGroup) throws FileNotFoundException, IOException { + FileStatus[] fileStatuses; + if (StringUtils.isEmpty(specGroup)) { + fileStatuses = fs.listStatus(fsSpecStoreDirPath); + } else { + fileStatuses = fs.listStatus(new Path(fsSpecStoreDirPath, specGroup)); + } + return fileStatuses; + } + @Override public void addSpec(Spec spec) throws IOException { Preconditions.checkArgument(null != spec, "Spec should not be null"); @@ -207,25 +267,6 @@ public class FSSpecStore implements SpecStore { } @Override - public Collection<Spec> getAllVersionsOfSpec(URI specUri) throws IOException, SpecNotFoundException { - Preconditions.checkArgument(null != specUri, "Spec URI should not be null"); - - Collection<Spec> specs = getSpecs(); - Collection<Spec> filteredSpecs = Lists.newArrayList(); - for (Spec spec : specs) { - if (spec.getUri().equals(specUri)) { - filteredSpecs.add(spec); - } - } - - if (filteredSpecs.size() == 0) { - throw new SpecNotFoundException(specUri); - } - - return filteredSpecs; - } - - @Override public Collection<Spec> getSpecs() throws IOException { Collection<Spec> specs = Lists.newArrayList(); try { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java index 73c1f46..ae2e087 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java @@ -24,6 +24,7 @@ import java.util.Collection; import java.util.Properties; import org.apache.commons.io.FileUtils; +import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -53,6 +54,7 @@ public class FlowCatalogTest { private static final String SPEC_STORE_PARENT_DIR = "/tmp"; private static final String SPEC_STORE_DIR = "/tmp/flowTestSpecStore"; + private static final String SPEC_GROUP_DIR = "/tmp/flowTestSpecStore/flowTestGroupDir"; private static final String SPEC_DESCRIPTION = "Test Flow Spec"; private static final String SPEC_VERSION = "1"; @@ -142,7 +144,7 @@ public class FlowCatalogTest { } @Test (dependsOnMethods = "createFlowSpec") - public void deleteFlowSpec() { + public void deleteFlowSpec() throws SpecNotFoundException { // List Current Specs Collection<Spec> specs = flowCatalog.getSpecs(); logger.info("[Before Delete] Number of specs: " + specs.size()); @@ -157,18 +159,18 @@ public class FlowCatalogTest { // List Specs after adding specs = flowCatalog.getSpecs(); - logger.info("[After Create] Number of specs: " + specs.size()); + logger.info("[After Delete] Number of specs: " + specs.size()); i = 0; for (Spec spec : specs) { flowSpec = (FlowSpec) spec; - logger.info("[After Create] Spec " + i++ + ": " + gson.toJson(flowSpec)); + logger.info("[After Delete] Spec " + i++ + ": " + gson.toJson(flowSpec)); } Assert.assertTrue(specs.size() == 0, "Spec store should be empty after deletion"); } public URI computeFlowSpecURI() { // Make sure this is relative - URI uri = PathUtils.relativizePath(new Path(SPEC_STORE_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri(); + URI uri = PathUtils.relativizePath(new Path(SPEC_GROUP_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri(); return uri; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java index 82f3e0d..00f8fc2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GitConfigMonitor.java @@ -25,6 +25,8 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.spec_store.FSSpecStore; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -234,8 +236,8 @@ public class GitConfigMonitor extends AbstractIdleService { private void removeSpec(DiffEntry change) { if (checkConfigFilePath(change.getOldPath())) { Path configFilePath = new Path(this.repositoryDir, change.getOldPath()); - String flowName = Files.getNameWithoutExtension(configFilePath.getName()); - String flowGroup = configFilePath.getParent().getName(); + String flowName = FSSpecStore.getSpecName(configFilePath); + String flowGroup = FSSpecStore.getSpecGroup(configFilePath); // build a dummy config to get the proper URI for delete Config dummyConfig = ConfigBuilder.create() @@ -249,7 +251,12 @@ public class GitConfigMonitor extends AbstractIdleService { .withDescription(SPEC_DESCRIPTION) .build(); - this.flowCatalog.remove(spec.getUri()); + try { + this.flowCatalog.remove(spec.getUri()); + } catch (SpecNotFoundException e) { + // okay if flow does not exist + log.warn("Flow {} does not exist.", spec.getUri()); + } } } @@ -285,8 +292,8 @@ public class GitConfigMonitor extends AbstractIdleService { */ private Config loadConfigFileWithFlowNameOverrides(Path configFilePath) throws IOException { Config flowConfig = this.pullFileLoader.loadPullFile(configFilePath, emptyConfig, false); - String flowName = Files.getNameWithoutExtension(configFilePath.getName()); - String flowGroup = configFilePath.getParent().getName(); + String flowName = FSSpecStore.getSpecName(configFilePath); + String flowGroup = FSSpecStore.getSpecGroup(configFilePath); return flowConfig.withValue(ConfigurationKeys.FLOW_NAME_KEY, ConfigValueFactory.fromAnyRef(flowName)) .withValue(ConfigurationKeys.FLOW_GROUP_KEY, ConfigValueFactory.fromAnyRef(flowGroup)); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/2d05b03d/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java index a933e85..8896068 100644 --- a/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java +++ b/gobblin-service/src/test/java/org/apache/gobblin/service/modules/orchestration/OrchestratorTest.java @@ -60,6 +60,7 @@ public class OrchestratorTest { private static final String SPEC_VERSION = "1"; private static final String TOPOLOGY_SPEC_STORE_DIR = "/tmp/orchestrator/topologyTestSpecStore"; private static final String FLOW_SPEC_STORE_DIR = "/tmp/orchestrator/flowTestSpecStore"; + private static final String FLOW_SPEC_GROUP_DIR = "/tmp/orchestrator/flowTestSpecStore/flowTestGroupDir"; private ServiceBasedAppLauncher serviceLauncher; private TopologyCatalog topologyCatalog; @@ -141,7 +142,7 @@ public class OrchestratorTest { FlowSpec.Builder flowSpecBuilder = null; try { flowSpecBuilder = FlowSpec.builder(computeTopologySpecURI(SPEC_STORE_PARENT_DIR, - FLOW_SPEC_STORE_DIR)) + FLOW_SPEC_GROUP_DIR)) .withConfig(config) .withDescription(SPEC_DESCRIPTION) .withVersion(SPEC_VERSION)
