http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java new file mode 100644 index 0000000..e0a235c --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/AbstractSpecExecutor.java @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.spec_executorInstance; + +import java.net.URI; +import java.net.URISyntaxException; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import com.google.common.base.Optional; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Maps; +import com.google.common.io.Closer; +import com.google.common.util.concurrent.AbstractIdleService; +import com.typesafe.config.Config; + +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.util.CompletedFuture; + +import edu.umd.cs.findbugs.annotations.SuppressWarnings; + + +/** + * An abstract implementation of SpecExecutor without specifying communication mechanism. + * + * Normally in the implementation of {@link AbstractSpecExecutor}, it is necessary to specify: + * {@link SpecProducer} + * {@link SpecConsumer} + * {@link Closer} + */ +public abstract class AbstractSpecExecutor extends AbstractIdleService implements SpecExecutor { + + private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); + private static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults(); + + protected final transient Logger log; + + // Executor Instance identifier + protected final URI specExecutorInstanceUri; + + @SuppressWarnings(justification = "No bug", value = "SE_BAD_FIELD") + protected final Config config; + + protected final Map<ServiceNode, ServiceNode> capabilities; + + /** + * While AbstractSpecExecutor is up, for most producer implementations (like SimpleKafkaSpecProducer), + * they implements {@link java.io.Closeable} which requires registration and close methods. + * {@link Closer} is mainly used for managing {@link SpecProducer} and {@link SpecConsumer}. + */ + protected Optional<Closer> optionalCloser; + + public AbstractSpecExecutor(Config config) { + this(config, Optional.<Logger>absent()); + } + + public AbstractSpecExecutor(Config config, GobblinInstanceEnvironment env) { + this(config, Optional.of(env.getLog())); + } + + public AbstractSpecExecutor(Config config, Optional<Logger> log) { + + /** + * Since URI is regarded as the unique identifier for {@link SpecExecutor}(Used in equals method) + * it is dangerous to use default URI. + */ + if (!config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY)) { + if (log.isPresent()) { + log.get().warn("The SpecExecutor doesn't specify URI, using the default one."); + } + } + + try { + specExecutorInstanceUri = + new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, "NA")); + } catch (URISyntaxException e) { + throw new RuntimeException(e); + } + this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); + this.config = config; + this.capabilities = Maps.newHashMap(); + if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) { + String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY); + List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr); + for (String capability : capabilities) { + List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability); + Preconditions.checkArgument(currentCapability.size() == 2, + "Only one source:destination pair is supported " + "per capability, found: " + currentCapability); + this.capabilities.put(new BaseServiceNodeImpl(currentCapability.get(0)), + new BaseServiceNodeImpl(currentCapability.get(1))); + } + } + optionalCloser = Optional.absent(); + } + + @Override + public URI getUri() { + return specExecutorInstanceUri; + } + + /** + * The definition of attributes are the technology that a {@link SpecExecutor} is using and + * the physical location that it runs on. + * + * These attributes are supposed to be static and read-only. + */ + @Override + public Config getAttrs() { + Preconditions.checkArgument(this.config.hasPath(ServiceConfigKeys.ATTRS_PATH_IN_CONFIG), + "Input configuration doesn't contains SpecExecutor Attributes path."); + return this.config.getConfig(ServiceConfigKeys.ATTRS_PATH_IN_CONFIG); + } + + @Override + public Future<Config> getConfig() { + return new CompletedFuture(this.config, null); + } + + @Override + public Future<? extends Map<ServiceNode, ServiceNode>> getCapabilities() { + return new CompletedFuture(this.capabilities, null); + } + + /** + * Two {@link SpecExecutor}s with the same {@link #specExecutorInstanceUri} + * should be considered as the same {@link SpecExecutor}. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + AbstractSpecExecutor that = (AbstractSpecExecutor) o; + + return specExecutorInstanceUri.equals(that.specExecutorInstanceUri); + } + + @Override + public int hashCode() { + return specExecutorInstanceUri.hashCode(); + } + + /** + * @return In default implementation we just return 'Healthy'. + */ + @Override + public Future<String> getHealth() { + return new CompletedFuture("Healthy", null); + } + + abstract protected void startUp() throws Exception; + + abstract protected void shutDown() throws Exception; + + abstract public Future<? extends SpecProducer> getProducer(); + + abstract public Future<String> getDescription(); +}
http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java new file mode 100644 index 0000000..dcc0c3b --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/BaseServiceNodeImpl.java @@ -0,0 +1,100 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.runtime.spec_executorInstance; + +import java.util.Properties; + +import com.google.common.base.Preconditions; +import com.typesafe.config.Config; + +import lombok.Setter; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.util.ConfigUtils; + +import lombok.Getter; + +/** + * A base implementation for {@link ServiceNode} with default secured setting. + */ +public class BaseServiceNodeImpl implements ServiceNode { + + @Getter + public String nodeName; + + /** + * Contains read-only properties of an {@link ServiceNode} + */ + @Getter + public Config nodeProps; + + /** + * One of mutable properties of Node. + * Initialization: Obtained from {@link ServiceConfigKeys}. + * Getter/Setter: Simply thur. {@link BaseServiceNodeImpl}. + */ + @Getter + @Setter + private boolean isNodeSecure; + + /** + * For nodes missing configuration + * @param nodeName + */ + public BaseServiceNodeImpl(String nodeName) { + this(nodeName, new Properties()); + } + + public BaseServiceNodeImpl(String nodeName, Properties props) { + Preconditions.checkNotNull(nodeName); + this.nodeName = nodeName; + isNodeSecure = Boolean.parseBoolean + (props.getProperty(ServiceConfigKeys.NODE_SECURITY_KEY, ServiceConfigKeys.DEFAULT_NODE_SECURITY)); + nodeProps = ConfigUtils.propertiesToConfig(props); + } + + /** + * By default each node is acceptable to use in path-finding. + */ + @Override + public boolean isNodeEnabled() { + return true; + } + + /** + * The comparison between two nodes should involve the configuration. + * Node name is the identifier for the node. + * */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + BaseServiceNodeImpl that = (BaseServiceNodeImpl) o; + + return nodeName.equals(that.nodeName); + } + + @Override + public int hashCode() { + return nodeName.hashCode(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java new file mode 100644 index 0000000..e0be4e9 --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutor.java @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.spec_executorInstance; + +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigValue; +import java.net.URI; +import java.util.Properties; +import java.util.concurrent.Future; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; + +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment; + + + +/** + * An {@link SpecExecutor} implementation that keep provisioned {@link Spec} in memory. + * Therefore there's no necessity to install {@link SpecConsumer} in this case. + */ +public class InMemorySpecExecutor extends AbstractSpecExecutor { + // Communication mechanism components. + // Not specifying final for further extension based on this implementation. + private SpecProducer<Spec> inMemorySpecProducer; + + public InMemorySpecExecutor(Config config){ + this(config, Optional.absent()); + } + + public InMemorySpecExecutor(Config config, GobblinInstanceEnvironment env){ + this(config, Optional.of(env.getLog())); + } + + public InMemorySpecExecutor(Config config, Optional<Logger> log) { + super(config, log); + inMemorySpecProducer = new InMemorySpecProducer(config); + } + + /** + * A creator that create a SpecExecutor only specifying URI for uniqueness. + * @param uri + */ + public static SpecExecutor createDummySpecExecutor(URI uri) { + Properties properties = new Properties(); + properties.setProperty(ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, uri.toString()); + return new InMemorySpecExecutor(ConfigFactory.parseProperties(properties)); + } + + @Override + public Future<String> getDescription() { + return new CompletedFuture("InMemory SpecExecutor", null); + } + + @Override + public Future<? extends SpecProducer> getProducer(){ + return new CompletedFuture(this.inMemorySpecProducer, null); + } + + @Override + protected void startUp() throws Exception { + // Nothing to do in the abstract implementation. + } + + @Override + protected void shutDown() throws Exception { + // Nothing to do in the abstract implementation. + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutorInstanceProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutorInstanceProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutorInstanceProducer.java deleted file mode 100644 index 77faaa7..0000000 --- a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecExecutorInstanceProducer.java +++ /dev/null @@ -1,147 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.runtime.spec_executorInstance; - -import java.io.Serializable; -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; - -import edu.umd.cs.findbugs.annotations.SuppressWarnings; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.typesafe.config.Config; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.GobblinInstanceEnvironment; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; -import org.apache.gobblin.util.CompletedFuture; - - -public class InMemorySpecExecutorInstanceProducer implements SpecExecutorInstanceProducer<Spec>, Serializable { - - private static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); - private static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults(); - - private static final long serialVersionUID = 6106269076155338045L; - - protected final transient Logger log; - protected final Map<URI, Spec> provisionedSpecs; - @SuppressWarnings (justification="No bug", value="SE_BAD_FIELD") - protected final Config config; - protected final Map<String, String> capabilities; - - public InMemorySpecExecutorInstanceProducer(Config config) { - this(config, Optional.<Logger>absent()); - } - - public InMemorySpecExecutorInstanceProducer(Config config, GobblinInstanceEnvironment env) { - this(config, Optional.of(env.getLog())); - } - - public InMemorySpecExecutorInstanceProducer(Config config, Optional<Logger> log) { - this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - this.config = config; - this.provisionedSpecs = Maps.newHashMap(); - this.capabilities = Maps.newHashMap(); - if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) { - String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY); - List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr); - for (String capability : capabilities) { - List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability); - Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported " - + "per capability, found: " + currentCapability); - this.capabilities.put(currentCapability.get(0), currentCapability.get(1)); - } - } - } - - @Override - public URI getUri() { - try { - return new URI("InMemorySpecExecutorInstanceProducer"); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - } - - @Override - public Future<String> getDescription() { - return new CompletedFuture("InMemory SpecExecutorInstanceProducer", null); - } - - @Override - public Future<Config> getConfig() { - return new CompletedFuture(this.config, null); - } - - @Override - public Future<String> getHealth() { - return new CompletedFuture("Healthy", null); - } - - @Override - public Future<? extends Map<String, String>> getCapabilities() { - return new CompletedFuture(this.capabilities, null); - } - - @Override - public Future<?> addSpec(Spec addedSpec) { - provisionedSpecs.put(addedSpec.getUri(), addedSpec); - log.info(String.format("Added Spec: %s with Uri: %s for execution on this executor.", addedSpec, addedSpec.getUri())); - - return new CompletedFuture(Boolean.TRUE, null); - } - - @Override - public Future<?> updateSpec(Spec updatedSpec) { - if (!provisionedSpecs.containsKey(updatedSpec.getUri())) { - throw new RuntimeException("Spec not found: " + updatedSpec.getUri()); - } - provisionedSpecs.put(updatedSpec.getUri(), updatedSpec); - log.info(String.format("Updated Spec: %s with Uri: %s for execution on this executor.", updatedSpec, updatedSpec.getUri())); - - return new CompletedFuture(Boolean.TRUE, null); - } - - @Override - public Future<?> deleteSpec(URI deletedSpecURI) { - if (!provisionedSpecs.containsKey(deletedSpecURI)) { - throw new RuntimeException("Spec not found: " + deletedSpecURI); - } - provisionedSpecs.remove(deletedSpecURI); - log.info(String.format("Deleted Spec with Uri: %s from this executor.", deletedSpecURI)); - - return new CompletedFuture(Boolean.TRUE, null); - } - - @Override - public Future<? extends List<Spec>> listSpecs() { - return new CompletedFuture<>(Lists.newArrayList(provisionedSpecs.values()), null); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java new file mode 100644 index 0000000..80f64ec --- /dev/null +++ b/gobblin-runtime/src/main/java/org/apache/gobblin/runtime/spec_executorInstance/InMemorySpecProducer.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.spec_executorInstance; + +import java.io.Serializable; +import java.net.URI; +import java.util.List; +import java.util.Map; +import java.util.concurrent.Future; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; + +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.util.CompletedFuture; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class InMemorySpecProducer implements SpecProducer<Spec>, Serializable { + private final Map<URI, Spec> provisionedSpecs; + private transient Config config; + + private static final long serialVersionUID = 6106269076155338045L; + + public InMemorySpecProducer(Config config) { + this.config = config; + this.provisionedSpecs = Maps.newHashMap(); + } + + @Override + public Future<?> addSpec(Spec addedSpec) { + provisionedSpecs.put(addedSpec.getUri(), addedSpec); + log.info(String.format("Added Spec: %s with Uri: %s for execution on this executor.", addedSpec, addedSpec.getUri())); + + return new CompletedFuture(Boolean.TRUE, null); + } + + @Override + public Future<?> updateSpec(Spec updatedSpec) { + if (!provisionedSpecs.containsKey(updatedSpec.getUri())) { + throw new RuntimeException("Spec not found: " + updatedSpec.getUri()); + } + provisionedSpecs.put(updatedSpec.getUri(), updatedSpec); + log.info(String.format("Updated Spec: %s with Uri: %s for execution on this executor.", updatedSpec, updatedSpec.getUri())); + + return new CompletedFuture(Boolean.TRUE, null); + } + + @Override + public Future<?> deleteSpec(URI deletedSpecURI) { + if (!provisionedSpecs.containsKey(deletedSpecURI)) { + throw new RuntimeException("Spec not found: " + deletedSpecURI); + } + provisionedSpecs.remove(deletedSpecURI); + log.info(String.format("Deleted Spec with Uri: %s from this executor.", deletedSpecURI)); + + return new CompletedFuture(Boolean.TRUE, null); + } + + @Override + public Future<? extends List<Spec>> listSpecs() { + return new CompletedFuture<>(Lists.newArrayList(provisionedSpecs.values()), null); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java index c42e605..73c1f46 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/FlowCatalogTest.java @@ -38,13 +38,13 @@ import com.google.gson.GsonBuilder; import com.typesafe.config.Config; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; import org.apache.gobblin.runtime.spec_catalog.FlowCatalog; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; +import org.apache.gobblin.runtime.api.SpecExecutor; public class FlowCatalogTest { @@ -89,7 +89,7 @@ public class FlowCatalogTest { properties.put("specExecInstance.capabilities", "source:destination"); Config config = ConfigUtils.propertiesToConfig(properties); - SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config); + SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config); FlowSpec.Builder flowSpecBuilder = null; try { @@ -171,4 +171,4 @@ public class FlowCatalogTest { URI uri = PathUtils.relativizePath(new Path(SPEC_STORE_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri(); return uri; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java ---------------------------------------------------------------------- diff --git a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java index 594c755..48fba40 100644 --- a/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java +++ b/gobblin-runtime/src/test/java/org/apache/gobblin/spec_catalog/TopologyCatalogTest.java @@ -23,6 +23,7 @@ import java.util.Collection; import java.util.Properties; import org.apache.commons.io.FileUtils; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.hadoop.fs.Path; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -37,11 +38,10 @@ import com.google.gson.GsonBuilder; import com.typesafe.config.Config; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; import org.apache.gobblin.runtime.api.TopologySpec; import org.apache.gobblin.runtime.app.ServiceBasedAppLauncher; import org.apache.gobblin.runtime.spec_catalog.TopologyCatalog; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer; +import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.PathUtils; @@ -89,13 +89,13 @@ public class TopologyCatalogTest { properties.put("specExecInstance.capabilities", "source:destination"); Config config = ConfigUtils.propertiesToConfig(properties); - SpecExecutorInstanceProducer specExecutorInstanceProducer = new InMemorySpecExecutorInstanceProducer(config); + SpecExecutor specExecutorInstanceProducer = new InMemorySpecExecutor(config); TopologySpec.Builder topologySpecBuilder = TopologySpec.builder(computeTopologySpecURI()) .withConfig(config) .withDescription(SPEC_DESCRIPTION) .withVersion(SPEC_VERSION) - .withSpecExecutorInstanceProducer(specExecutorInstanceProducer); + .withSpecExecutor(specExecutorInstanceProducer); return topologySpecBuilder.build(); } @@ -166,4 +166,4 @@ public class TopologyCatalogTest { URI uri = PathUtils.relativizePath(new Path(SPEC_STORE_DIR), new Path(SPEC_STORE_PARENT_DIR)).toUri(); return uri; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-service/build.gradle b/gobblin-service/build.gradle index 642d818..0b461bd 100644 --- a/gobblin-service/build.gradle +++ b/gobblin-service/build.gradle @@ -58,6 +58,7 @@ dependencies { compile externalDependency.javaxInject compile externalDependency.jgit compile externalDependency.jodaTime + compile externalDependency.jgrapht compile externalDependency.kafka08 compile externalDependency.log4j compile externalDependency.lombok http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/HelixUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/HelixUtils.java b/gobblin-service/src/main/java/org/apache/gobblin/service/HelixUtils.java deleted file mode 100644 index 4f01623..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/HelixUtils.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import com.google.common.annotations.VisibleForTesting; -import java.util.UUID; -import org.apache.helix.Criteria; -import org.apache.helix.HelixManager; -import org.apache.helix.HelixManagerFactory; -import org.apache.helix.InstanceType; -import org.apache.helix.manager.zk.ZKHelixManager; -import org.apache.helix.model.HelixConfigScope; -import org.apache.helix.model.Message; -import org.apache.helix.tools.ClusterSetup; - -import org.apache.gobblin.annotation.Alpha; -import org.slf4j.Logger; - - -@Alpha -public class HelixUtils { - - /*** - * Build a Helix Manager (Helix Controller instance). - * - * @param helixInstanceName the Helix Instance name. - * @param helixClusterName the Helix Cluster name. - * @param zkConnectionString the ZooKeeper connection string. - * @return HelixManager - */ - public static HelixManager buildHelixManager(String helixInstanceName, String helixClusterName, String zkConnectionString) { - return HelixManagerFactory.getZKHelixManager(helixClusterName, helixInstanceName, - InstanceType.CONTROLLER, zkConnectionString); - } - - /** - * Create a Helix cluster for the Gobblin Cluster application. - * - * @param zkConnectionString the ZooKeeper connection string - * @param clusterName the Helix cluster name - */ - public static void createGobblinHelixCluster(String zkConnectionString, String clusterName) { - createGobblinHelixCluster(zkConnectionString, clusterName, true); - } - - /** - * Create a Helix cluster for the Gobblin Cluster application. - * - * @param zkConnectionString the ZooKeeper connection string - * @param clusterName the Helix cluster name - * @param overwrite true to overwrite exiting cluster, false to reuse existing cluster - */ - public static void createGobblinHelixCluster(String zkConnectionString, String clusterName, boolean overwrite) { - ClusterSetup clusterSetup = new ClusterSetup(zkConnectionString); - // Create the cluster and overwrite if it already exists - clusterSetup.addCluster(clusterName, overwrite); - // Helix 0.6.x requires a configuration property to have the form key=value. - String autoJoinConfig = ZKHelixManager.ALLOW_PARTICIPANT_AUTO_JOIN + "=true"; - clusterSetup.setConfig(HelixConfigScope.ConfigScopeProperty.CLUSTER, clusterName, autoJoinConfig); - } - - /** - * Get a Helix instance name. - * - * @param namePrefix a prefix of Helix instance names - * @param instanceId an integer instance ID - * @return a Helix instance name that is a concatenation of the given prefix and instance ID - */ - public static String getHelixInstanceName(String namePrefix, int instanceId) { - return namePrefix + "_" + instanceId; - } - - @VisibleForTesting - public static void sendUserDefinedMessage(String messageSubType, String messageVal, String messageId, - InstanceType instanceType, HelixManager helixManager, Logger logger) { - Criteria criteria = new Criteria(); - criteria.setInstanceName("%"); - criteria.setResource("%"); - criteria.setPartition("%"); - criteria.setPartitionState("%"); - criteria.setRecipientInstanceType(instanceType); - criteria.setSessionSpecific(true); - - Message message = new Message(Message.MessageType.USER_DEFINE_MSG.toString(), messageId); - message.setMsgSubType(messageSubType); - message.setAttribute(Message.Attributes.INNER_MESSAGE, messageVal); - message.setMsgState(Message.MessageState.NEW); - message.setTgtSessionId("*"); - - int messagesSent = helixManager.getMessagingService().send(criteria, message); - if (messagesSent == 0) { - logger.error(String.format("Failed to send the %s message to the participants", message)); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java deleted file mode 100644 index 8ea19c4..0000000 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.service; - -import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutorInstanceProducer; -import org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler; -import org.apache.gobblin.service.modules.topology.ConfigBasedTopologySpecFactory; - -@Alpha -public class ServiceConfigKeys { - - private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service."; - - // Gobblin Service Manager Keys - public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled"; - public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled"; - public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled"; - public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled"; - public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled"; - public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled"; - public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled"; - - // Helix / ServiceScheduler Keys - public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name"; - public static final String ZK_CONNECTION_STRING_KEY = GOBBLIN_SERVICE_PREFIX + "zk.connection.string"; - public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name"; - public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helixInstanceName"; - public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX + "flowSpec"; - - // Helix message sub types for FlowSpec - public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD"; - public static final String HELIX_FLOWSPEC_REMOVE = "FLOWSPEC_REMOVE"; - public static final String HELIX_FLOWSPEC_UPDATE = "FLOWSPEC_UPDATE"; - - // Flow Compiler Keys - public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class"; - public static final String DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS = IdentityFlowToJobSpecCompiler.class.getCanonicalName(); - - // Flow specific Keys - public static final String FLOW_SOURCE_IDENTIFIER_KEY = "gobblin.flow.sourceIdentifier"; - public static final String FLOW_DESTINATION_IDENTIFIER_KEY = "gobblin.flow.destinationIdentifier"; - - // Command line options - public static final String SERVICE_NAME_OPTION_NAME = "service_name"; - - // Topology Factory Keys (for overall factory) - public static final String TOPOLOGY_FACTORY_PREFIX = "topologySpecFactory."; - public static final String DEFAULT_TOPOLOGY_SPEC_FACTORY = ConfigBasedTopologySpecFactory.class.getCanonicalName(); - public static final String TOPOLOGYSPEC_FACTORY_KEY = TOPOLOGY_FACTORY_PREFIX + "class"; - public static final String TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY = TOPOLOGY_FACTORY_PREFIX + "topologyNames"; - - // Topology Factory Keys (for individual topologies) - public static final String TOPOLOGYSPEC_DESCRIPTION_KEY = "description"; - public static final String TOPOLOGYSPEC_VERSION_KEY = "version"; - public static final String TOPOLOGYSPEC_URI_KEY = "uri"; - public static final String DEFAULT_SPEC_EXECUTOR_INSTANCE_PRODUCER = InMemorySpecExecutorInstanceProducer.class.getCanonicalName(); - public static final String SPEC_EXECUTOR_INSTANCE_PRODUCER_KEY = "specExecutorInstanceProducer.class"; - - // Template Catalog Keys - public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath"; - - // Logging - public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties"; -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java index c2591e1..598371d 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/core/GobblinServiceManager.java @@ -83,7 +83,7 @@ import org.apache.gobblin.scheduler.SchedulerService; import org.apache.gobblin.service.FlowConfig; import org.apache.gobblin.service.FlowConfigClient; import org.apache.gobblin.service.FlowConfigsResource; -import org.apache.gobblin.service.HelixUtils; +import org.apache.gobblin.service.modules.utils.HelixUtils; import org.apache.gobblin.service.ServiceConfigKeys; import org.apache.gobblin.service.modules.orchestration.Orchestrator; import org.apache.gobblin.service.modules.scheduler.GobblinServiceJobScheduler; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java new file mode 100644 index 0000000..26f4463 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/BaseFlowToJobSpecCompiler.java @@ -0,0 +1,259 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import javax.annotation.Nonnull; + +import com.codahale.metrics.Meter; +import com.codahale.metrics.Timer; +import com.google.common.base.Optional; +import com.google.common.collect.Maps; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigValueFactory; + +import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.configuration.State; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.gobblin.metrics.MetricContext; +import org.apache.gobblin.metrics.Tag; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecCompiler; +import org.apache.gobblin.runtime.api.TopologySpec; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.instrumented.Instrumented; +import org.apache.gobblin.runtime.job_catalog.FSJobCatalog; +import org.apache.gobblin.service.ServiceConfigKeys; +import org.apache.gobblin.service.ServiceMetricNames; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.FlowSpec; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.JobTemplate; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.api.SpecNotFoundException; +import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; + +import lombok.Getter; +import lombok.Setter; + +// Provide base implementation for constructing multi-hops route. +@Alpha +public abstract class BaseFlowToJobSpecCompiler implements SpecCompiler { + + // Since {@link SpecCompiler} is an {@link SpecCatalogListener}, it is expected that any Spec change should be reflected + // to these data structures. + @Getter + @Setter + protected final Map<URI, TopologySpec> topologySpecMap; + + + /** + * Mapping between each FlowEdge and a list of applicable Templates. + * Compiler should obtain this Map info from higher level component. + * since {@link TopologySpec} doesn't contain Templates. + * Key: EdgeIdentifier from {@link org.apache.gobblin.runtime.api.FlowEdge#getEdgeIdentity()} + * Value: List of template URI. + */ + // TODO: Define how template info are instantiated. ETL-6217 + @Getter + @Setter + protected final Map<String, List<URI>> edgeTemplateMap; + + + protected final Config config; + protected final Logger log; + protected final Optional<FSJobCatalog> templateCatalog; + + protected final MetricContext metricContext; + @Getter + protected Optional<Meter> flowCompilationSuccessFulMeter; + @Getter + protected Optional<Meter> flowCompilationFailedMeter; + @Getter + protected Optional<Timer> flowCompilationTimer; + + public BaseFlowToJobSpecCompiler(Config config){ + this(config,true); + } + + public BaseFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled){ + this(config, Optional.<Logger>absent(), true); + } + + public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log){ + this(config, log,true); + } + + public BaseFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled){ + this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); + if (instrumentationEnabled) { + this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); + this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER)); + this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER)); + this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER)); + } + else { + this.metricContext = null; + this.flowCompilationSuccessFulMeter = Optional.absent(); + this.flowCompilationFailedMeter = Optional.absent(); + this.flowCompilationTimer = Optional.absent(); + } + + this.topologySpecMap = Maps.newConcurrentMap(); + this.edgeTemplateMap = Maps.newConcurrentMap(); + this.config = config; + + /*** + * ETL-5996 + * For multi-tenancy, the following needs to be added: + * 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs + * 2. Pick templateCatalog from JobCatalogWithTemplates based on URI, and try to resolve JobSpec using that + */ + try { + if (this.config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY) + && StringUtils.isNotBlank(this.config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) { + Config templateCatalogCfg = config + .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, + this.config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); + this.templateCatalog = Optional.of(new FSJobCatalog(templateCatalogCfg)); + } else { + this.templateCatalog = Optional.absent(); + } + } catch (IOException e) { + throw new RuntimeException("Could not initialize FlowCompiler because of " + + "TemplateCatalog initialization failure", e); + } + } + + @Override + public synchronized void onAddSpec(Spec addedSpec) { + topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec); + } + + @Override + public synchronized void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { + if (topologySpecMap.containsKey(deletedSpecURI)) { + topologySpecMap.remove(deletedSpecURI); + } + } + + @Override + public synchronized void onUpdateSpec(Spec updatedSpec) { + topologySpecMap.put(updatedSpec.getUri(), (TopologySpec) updatedSpec); + } + + @Nonnull + @Override + public MetricContext getMetricContext() { + return this.metricContext; + } + + @Override + public boolean isInstrumentationEnabled() { + return null != this.metricContext; + } + + @Override + public List<Tag<?>> generateTags(State state){ + return Collections.emptyList(); + } + + @Override + public void switchMetricContext(List<Tag<?>> tags) { + throw new UnsupportedOperationException(); + } + + @Override + public void switchMetricContext(MetricContext context) { + throw new UnsupportedOperationException(); + } + + @Override + public Map<URI, TopologySpec> getTopologySpecMap() { + return this.topologySpecMap; + } + + public abstract Map<Spec, SpecExecutor> compileFlow(Spec spec); + + /** + * Naive implementation of generating jobSpec, which fetch the first available template, + * in an exemplified single-hop FlowCompiler implementation. + * @param flowSpec + * @return + */ + protected JobSpec jobSpecGenerator(FlowSpec flowSpec) { + JobSpec jobSpec; + JobSpec.Builder jobSpecBuilder = JobSpec.builder(flowSpec.getUri()) + .withConfig(flowSpec.getConfig()) + .withDescription(flowSpec.getDescription()) + .withVersion(flowSpec.getVersion()); + + if (flowSpec.getTemplateURIs().isPresent() && templateCatalog.isPresent()) { + // Only first template uri will be honored for Identity + jobSpecBuilder = jobSpecBuilder.withTemplate(flowSpec.getTemplateURIs().get().iterator().next()); + try { + jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get()); + log.info("Resolved JobSpec properties are: " + jobSpec.getConfigAsProperties()); + } catch (SpecNotFoundException | JobTemplate.TemplateException e) { + throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e); + } + } else { + jobSpec = jobSpecBuilder.build(); + log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties()); + } + + // Remove schedule + jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY)); + + // Add job.name and job.group + if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) { + jobSpec.setConfig(jobSpec.getConfig() + .withValue(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY))); + } + if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) { + jobSpec.setConfig(jobSpec.getConfig() + .withValue(ConfigurationKeys.JOB_GROUP_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY))); + } + + // Add flow execution id for this compilation + long flowExecutionId = System.currentTimeMillis(); + jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, + ConfigValueFactory.fromAnyRef(flowExecutionId))); + + // Reset properties in Spec from Config + jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig())); + return jobSpec; + } + + /** + * Ideally each edge has its own eligible template repository(Based on {@link SpecExecutor}) + * to pick templates from. + * + * This function is to transform from all mixed templates ({@link #templateCatalog}) + * into categorized {@link #edgeTemplateMap}. + * + */ + abstract protected void populateEdgeTemplateMap(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java new file mode 100644 index 0000000..751bb09 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/FlowEdgeProps.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; +import com.typesafe.config.ConfigObject; +import java.io.Serializable; +import java.util.Properties; + +import static org.apache.gobblin.service.ServiceConfigKeys.*; + +import lombok.Data; +import lombok.Getter; +import lombok.Setter; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class FlowEdgeProps { + protected static final boolean DEFAULT_EDGE_SAFETY = true; + + /** + * Contains read-only properties that users want to package in. + */ + @Getter + protected Config config; + + /** + * One of the mutable properties of an edge. + */ + @Getter + @Setter + private boolean isEdgeSecure; + + public FlowEdgeProps(Config config) { + this.config = config; + isEdgeSecure = getInitialEdgeSafety(); + } + + public FlowEdgeProps() { + this(ConfigFactory.empty()); + } + + /** + * When initializing an edge, load and security value from properties will be used + * but could be overriden afterwards. + */ + private boolean getInitialEdgeSafety() { + return + config.hasPath(EDGE_SECURITY_KEY) ? config.getBoolean(EDGE_SECURITY_KEY) : DEFAULT_EDGE_SAFETY; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java index 9e5fd11..3fb20a2 100644 --- a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/IdentityFlowToJobSpecCompiler.java @@ -17,188 +17,82 @@ package org.apache.gobblin.service.modules.flow; -import java.io.IOException; -import java.net.URI; -import java.util.Collections; -import java.util.List; + import java.util.Map; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; -import javax.annotation.Nonnull; -import lombok.Getter; -import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; -import org.slf4j.LoggerFactory; -import com.codahale.metrics.Meter; -import com.codahale.metrics.Timer; import com.google.common.base.Optional; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; import com.typesafe.config.Config; -import com.typesafe.config.ConfigValueFactory; import org.apache.gobblin.annotation.Alpha; -import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.instrumented.Instrumented; -import org.apache.gobblin.metrics.MetricContext; -import org.apache.gobblin.metrics.Tag; import org.apache.gobblin.runtime.api.FlowSpec; import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.JobTemplate; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecCompiler; -import org.apache.gobblin.runtime.api.SpecExecutorInstance; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; -import org.apache.gobblin.runtime.api.SpecNotFoundException; import org.apache.gobblin.runtime.api.TopologySpec; -import org.apache.gobblin.runtime.job_catalog.FSJobCatalog; -import org.apache.gobblin.runtime.job_spec.ResolvedJobSpec; import org.apache.gobblin.service.ServiceConfigKeys; -import org.apache.gobblin.service.ServiceMetricNames; -import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.apache.gobblin.runtime.api.SpecExecutor; /*** * Take in a logical {@link Spec} ie flow and compile corresponding materialized job {@link Spec} - * and its mapping to {@link SpecExecutorInstance}. + * and its mapping to {@link SpecExecutor}. */ @Alpha -public class IdentityFlowToJobSpecCompiler implements SpecCompiler { - - private final Map<URI, TopologySpec> topologySpecMap; - private final Config config; - private final Logger log; - private final Optional<FSJobCatalog> templateCatalog; - - protected final MetricContext metricContext; - @Getter - private Optional<Meter> flowCompilationSuccessFulMeter; - @Getter - private Optional<Meter> flowCompilationFailedMeter; - @Getter - private Optional<Timer> flowCompilationTimer; +public class IdentityFlowToJobSpecCompiler extends BaseFlowToJobSpecCompiler { public IdentityFlowToJobSpecCompiler(Config config) { - this(config, true); + super(config, true); } public IdentityFlowToJobSpecCompiler(Config config, boolean instrumentationEnabled) { - this(config, Optional.<Logger>absent(), instrumentationEnabled); + super(config, Optional.<Logger>absent(), instrumentationEnabled); } public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log) { - this(config, log, true); + super(config, log, true); } public IdentityFlowToJobSpecCompiler(Config config, Optional<Logger> log, boolean instrumentationEnabled) { - this.log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - if (instrumentationEnabled) { - this.metricContext = Instrumented.getMetricContext(ConfigUtils.configToState(config), IdentityFlowToJobSpecCompiler.class); - this.flowCompilationSuccessFulMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_SUCCESSFUL_METER)); - this.flowCompilationFailedMeter = Optional.of(this.metricContext.meter(ServiceMetricNames.FLOW_COMPILATION_FAILED_METER)); - this.flowCompilationTimer = Optional.<Timer>of(this.metricContext.timer(ServiceMetricNames.FLOW_COMPILATION_TIMER)); - } - else { - this.metricContext = null; - this.flowCompilationSuccessFulMeter = Optional.absent(); - this.flowCompilationFailedMeter = Optional.absent(); - this.flowCompilationTimer = Optional.absent(); - } - - this.topologySpecMap = Maps.newConcurrentMap(); - this.config = config; - /*** - * For multi-tenancy, the following needs to be added: - * 1. Change singular templateCatalog to Map<URI, JobCatalogWithTemplates> to support multiple templateCatalogs - * 2. Pick templateCatalog from JobCatalogWithTemplates based on URI, and try to resolve JobSpec using that - */ - try { - if (this.config.hasPath(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY) - && StringUtils.isNotBlank(this.config.getString(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY))) { - Config templateCatalogCfg = config - .withValue(ConfigurationKeys.JOB_CONFIG_FILE_GENERAL_PATH_KEY, - this.config.getValue(ServiceConfigKeys.TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY)); - this.templateCatalog = Optional.of(new FSJobCatalog(templateCatalogCfg)); - } else { - this.templateCatalog = Optional.absent(); - } - } catch (IOException e) { - throw new RuntimeException("Could not initialize IdentityFlowToJobSpecCompiler because of " - + "TemplateCatalog initialization failure", e); - } + super(config, log, instrumentationEnabled); } @Override - public Map<Spec, SpecExecutorInstanceProducer> compileFlow(Spec spec) { + public Map<Spec, SpecExecutor> compileFlow(Spec spec) { Preconditions.checkNotNull(spec); Preconditions.checkArgument(spec instanceof FlowSpec, "IdentityFlowToJobSpecCompiler only converts FlowSpec to JobSpec"); long startTime = System.nanoTime(); - Map<Spec, SpecExecutorInstanceProducer> specExecutorInstanceMap = Maps.newLinkedHashMap(); + Map<Spec, SpecExecutor> specExecutorMap = Maps.newLinkedHashMap(); FlowSpec flowSpec = (FlowSpec) spec; String source = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_SOURCE_IDENTIFIER_KEY); String destination = flowSpec.getConfig().getString(ServiceConfigKeys.FLOW_DESTINATION_IDENTIFIER_KEY); log.info(String.format("Compiling flow for source: %s and destination: %s", source, destination)); - JobSpec jobSpec; - JobSpec.Builder jobSpecBuilder = JobSpec.builder(flowSpec.getUri()) - .withConfig(flowSpec.getConfig()) - .withDescription(flowSpec.getDescription()) - .withVersion(flowSpec.getVersion()); - - if (flowSpec.getTemplateURIs().isPresent() && templateCatalog.isPresent()) { - // Only first template uri will be honored for Identity - jobSpecBuilder = jobSpecBuilder.withTemplate(flowSpec.getTemplateURIs().get().iterator().next()); - try { - jobSpec = new ResolvedJobSpec(jobSpecBuilder.build(), templateCatalog.get()); - log.info("Resolved JobSpec properties are: " + jobSpec.getConfigAsProperties()); - } catch (SpecNotFoundException | JobTemplate.TemplateException e) { - throw new RuntimeException("Could not resolve template in JobSpec from TemplateCatalog", e); - } - } else { - jobSpec = jobSpecBuilder.build(); - log.info("Unresolved JobSpec properties are: " + jobSpec.getConfigAsProperties()); - } - - // Remove schedule - jobSpec.setConfig(jobSpec.getConfig().withoutPath(ConfigurationKeys.JOB_SCHEDULE_KEY)); - - // Add job.name and job.group - if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_NAME_KEY)) { - jobSpec.setConfig(jobSpec.getConfig() - .withValue(ConfigurationKeys.JOB_NAME_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_NAME_KEY))); - } - if (flowSpec.getConfig().hasPath(ConfigurationKeys.FLOW_GROUP_KEY)) { - jobSpec.setConfig(jobSpec.getConfig() - .withValue(ConfigurationKeys.JOB_GROUP_KEY, flowSpec.getConfig().getValue(ConfigurationKeys.FLOW_GROUP_KEY))); - } - - // Add flow execution id for this compilation - long flowExecutionId = System.currentTimeMillis(); - jobSpec.setConfig(jobSpec.getConfig().withValue(ConfigurationKeys.FLOW_EXECUTION_ID_KEY, - ConfigValueFactory.fromAnyRef(flowExecutionId))); - - // Reset properties in Spec from Config - jobSpec.setConfigAsProperties(ConfigUtils.configToProperties(jobSpec.getConfig())); + JobSpec jobSpec = jobSpecGenerator(flowSpec); for (TopologySpec topologySpec : topologySpecMap.values()) { try { - Map<String, String> capabilities = (Map<String, String>) topologySpec.getSpecExecutorInstanceProducer().getCapabilities().get(); - for (Map.Entry<String, String> capability : capabilities.entrySet()) { + Map<ServiceNode, ServiceNode> capabilities = (Map<ServiceNode, ServiceNode>) topologySpec.getSpecExecutor().getCapabilities().get(); + for (Map.Entry<ServiceNode, ServiceNode> capability : capabilities.entrySet()) { log.info(String.format("Evaluating current JobSpec: %s against TopologySpec: %s with " - + "capability of source: %s and destination: %s ", jobSpec.getUri(), + + "capability of source: %s and destination: %s ", jobSpec.getUri(), topologySpec.getUri(), capability.getKey(), capability.getValue())); - if (source.equals(capability.getKey()) && destination.equals(capability.getValue())) { - specExecutorInstanceMap.put(jobSpec, topologySpec.getSpecExecutorInstanceProducer()); + if (source.equals(capability.getKey().getNodeName()) && destination.equals(capability.getValue().getNodeName())) { + specExecutorMap.put(jobSpec, topologySpec.getSpecExecutor()); log.info(String.format("Current JobSpec: %s is executable on TopologySpec: %s. Added TopologySpec as candidate.", jobSpec.getUri(), topologySpec.getUri())); log.info("Since we found a candidate executor, we will not try to compute more. " + "(Intended limitation for IdentityFlowToJobSpecCompiler)"); - return specExecutorInstanceMap; + return specExecutorMap; } } } catch (InterruptedException | ExecutionException e) { @@ -209,52 +103,12 @@ public class IdentityFlowToJobSpecCompiler implements SpecCompiler { Instrumented.markMeter(this.flowCompilationSuccessFulMeter); Instrumented.updateTimer(this.flowCompilationTimer, System.nanoTime() - startTime, TimeUnit.NANOSECONDS); - return specExecutorInstanceMap; - } - - @Override - public Map<URI, TopologySpec> getTopologySpecMap() { - return this.topologySpecMap; - } - - @Override - public void onAddSpec(Spec addedSpec) { - topologySpecMap.put(addedSpec.getUri(), (TopologySpec) addedSpec); - } - - @Override - public void onDeleteSpec(URI deletedSpecURI, String deletedSpecVersion) { - topologySpecMap.remove(deletedSpecURI); - } - - @Override - public void onUpdateSpec(Spec updatedSpec) { - topologySpecMap.put(updatedSpec.getUri(), (TopologySpec) updatedSpec); - } - - @Nonnull - @Override - public MetricContext getMetricContext() { - return this.metricContext; - } - - @Override - public boolean isInstrumentationEnabled() { - return null != this.metricContext; - } - - @Override - public List<Tag<?>> generateTags(org.apache.gobblin.configuration.State state) { - return Collections.emptyList(); - } - - @Override - public void switchMetricContext(List<Tag<?>> tags) { - throw new UnsupportedOperationException(); + return specExecutorMap; } @Override - public void switchMetricContext(MetricContext context) { - throw new UnsupportedOperationException(); + protected void populateEdgeTemplateMap() { + log.warn("No population of templates based on edge happen in this implementation"); + return; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java ---------------------------------------------------------------------- diff --git a/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java new file mode 100644 index 0000000..83b94e3 --- /dev/null +++ b/gobblin-service/src/main/java/org/apache/gobblin/service/modules/flow/LoadBasedFlowEdgeImpl.java @@ -0,0 +1,180 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service.modules.flow; + +import com.typesafe.config.Config; + +import org.jgrapht.graph.DefaultWeightedEdge; +import org.apache.gobblin.annotation.Alpha; +import org.apache.gobblin.runtime.api.FlowEdge; +import org.apache.gobblin.runtime.api.ServiceNode; +import org.apache.gobblin.runtime.api.SpecExecutor; + +import lombok.Getter; + +/** + * A base implementation of a flowEdge in the weight multi-edge graph. + * For a weightedMultiGraph there could be multiple edges between two vertices. + * Recall that a triplet of <SourceNode, targetNode, specExecutor> determines one edge. + * It is expected that {@link org.jgrapht.graph.DirectedWeightedMultigraph#getAllEdges(Object, Object)} + * can return multiple edges with the same pair of source and destination but different SpecExecutor. + * + * Each edge has a {@FlowEdgeProp} which contains mutable and immutable properties. + * The {@link LoadBasedFlowEdgeImpl} exposes two mutable properties: Load and Security. + * + * Load of an edge is equivalent to weight defined in {@link DefaultWeightedEdge}. + * Since {@link #getWeight()} method is protected, {@link #getEdgeLoad()} will return the load. + * There's no setLoad, which is logically supposed to happen by invoking + * {@link org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)}. + * + * Security of an edge describes if an edge is secure to be part of data movement path at current stage. + * + */ +@Alpha +public class LoadBasedFlowEdgeImpl extends DefaultWeightedEdge implements FlowEdge { + + /** + * In our cases {@link LoadBasedFlowEdgeImpl} is not likely to be serialized. + * While as it extends {@link DefaultWeightedEdge} for best practice we made all fields transient, + * and specify serialVersionUID. + */ + private static final long serialVersionUID = 1L; + + @Getter + private transient ServiceNode sourceNode; + @Getter + private transient ServiceNode targetNode; + @Getter + private transient SpecExecutor specExecutorInstance; + + /** + * Contains both read-only and mutable attributes of properties of an edge. + * Mutable properties in{@link FlowEdgeProps} expose their Setter & Getter + * thru. either the {@link FlowEdgeProps} + * or graph-level api, e.g. {@link org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)} + * + * Typical mutable properties of an edge includes: + * Load(Weight), Security. + */ + private final transient FlowEdgeProps flowEdgeProps; + + public LoadBasedFlowEdgeImpl(ServiceNode sourceNode, ServiceNode targetNode, + FlowEdgeProps flowEdgeProps, SpecExecutor specExecutorInstance) { + this.sourceNode = sourceNode; + this.targetNode = targetNode; + this.flowEdgeProps = flowEdgeProps; + this.specExecutorInstance = specExecutorInstance; + } + + public LoadBasedFlowEdgeImpl(ServiceNode sourceNode, ServiceNode targetNode, + SpecExecutor specExecutor) { + this(sourceNode, targetNode, new FlowEdgeProps(specExecutor.getAttrs()), + specExecutor); + } + + // Load: Directly using {@link DefaultWeightedEdge}'s weight field. + /** + * Load: + * Initialization: super's default constructor + * Getter: {@link #getEdgeLoad()}} thru. {@link DefaultWeightedEdge}'s {@link #getWeight()}. + * Setter:Thru. {@link org.jgrapht.graph.DirectedWeightedMultigraph#setEdgeWeight(Object, double)} + */ + public double getEdgeLoad() { + return getWeight(); + } + + // Security: Get/Set thru. FlowEdgeProps + /** + * Initialization\Getter\Setter: By {@link FlowEdgeProps} + */ + public boolean getIsEdgeSecure() { + return flowEdgeProps.isEdgeSecure(); + } + public void setIsEdgeSecure(boolean isEdgeSecure) { + this.flowEdgeProps.setEdgeSecure(isEdgeSecure); + } + + + @Override + public String getEdgeIdentity() { + return this.calculateEdgeIdentity(this.sourceNode, this.targetNode, this.specExecutorInstance); + } + + @Override + public Config getEdgeProperties() { + return this.flowEdgeProps.getConfig(); + } + + @Override + /** + * Naive rule: If edge is secure, then it is qualified to be considered in path-finding. + */ + public boolean isEdgeEnabled() { + return this.flowEdgeProps.isEdgeSecure(); + } + + + /** + * A naive implementation of edge identity calculation. + * @return + */ + public static String calculateEdgeIdentity(ServiceNode sourceNode, ServiceNode targetNode, SpecExecutor specExecutorInstance){ + return sourceNode.getNodeName() + "-" + specExecutorInstance.getUri() + "-" + targetNode.getNodeName(); + } + + /** + * Recall that we need a triplet to uniquely define a {@link FlowEdge}: + * - {@link ServiceNode} sourceNode + * - {@link ServiceNode} targetNode + * - {@link SpecExecutor} SpecExecutor + * + * We DO NOT distinguish between two edges by other props like weight, + * as the load should be an attribute of an edge. + * These are IntelliJ-generated methods for equals and hashCode(). + * + * @param o The object that being compared + * @return If two {@link LoadBasedFlowEdgeImpl} are equivalent. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + LoadBasedFlowEdgeImpl that = (LoadBasedFlowEdgeImpl) o; + + if (!sourceNode.equals(that.sourceNode)) { + return false; + } + if (!targetNode.equals(that.targetNode)) { + return false; + } + return specExecutorInstance.equals(that.specExecutorInstance); + } + + @Override + public int hashCode() { + int result = sourceNode.hashCode(); + result = 31 * result + targetNode.hashCode(); + result = 31 * result + specExecutorInstance.hashCode(); + return result; + } +} \ No newline at end of file
