[GOBBLIN-3] Multi-hop flow compiler implementation Closes #2078 from autumnust/flowcompiler
Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/9402a903 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/9402a903 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/9402a903 Branch: refs/heads/master Commit: 9402a9037554bcae4cc958a69a85eb4a16e8c179 Parents: ea5047e Author: Lei Sun <[email protected]> Authored: Tue Sep 12 02:29:05 2017 -0700 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Sep 12 02:29:21 2017 -0700 ---------------------------------------------------------------------- conf/service/application.conf | 2 +- .../apache/gobblin/runtime/api/FlowEdge.java | 51 +++ .../apache/gobblin/runtime/api/ServiceNode.java | 43 +++ .../gobblin/runtime/api/SpecConsumer.java | 35 ++ .../gobblin/runtime/api/SpecExecutor.java | 79 +++++ .../runtime/api/SpecExecutorInstance.java | 71 ---- .../api/SpecExecutorInstanceConsumer.java | 30 -- .../api/SpecExecutorInstanceProducer.java | 44 --- .../gobblin/runtime/api/SpecProducer.java | 46 +++ .../gobblin/service/ServiceConfigKeys.java | 109 +++++++ .../GobblinClusterConfigurationKeys.java | 12 +- .../ScheduledJobConfigurationManager.java | 49 ++- .../StreamingJobConfigurationManager.java | 55 ++-- .../orchestration/AzkabanSpecExecutor.java | 76 +++++ .../AzkabanSpecExecutorInstance.java | 108 ------ .../AzkabanSpecExecutorInstanceProducer.java | 176 ---------- .../orchestration/AzkabanSpecProducer.java | 176 ++++++++++ .../service/SimpleKafkaSpecConsumer.java | 264 +++++++++++++++ .../service/SimpleKafkaSpecExecutor.java | 105 ++++++ .../SimpleKafkaSpecExecutorInstance.java | 131 -------- ...SimpleKafkaSpecExecutorInstanceConsumer.java | 261 --------------- ...SimpleKafkaSpecExecutorInstanceProducer.java | 139 -------- .../service/SimpleKafkaSpecProducer.java | 140 ++++++++ .../service/StreamingKafkaSpecConsumer.java | 173 ++++++++++ ...eamingKafkaSpecExecutorInstanceConsumer.java | 171 ---------- .../SimpleKafkaSpecExecutorInstanceTest.java | 180 ---------- .../service/SimpleKafkaSpecExecutorTest.java | 180 ++++++++++ .../StreamingKafkaSpecExecutorInstanceTest.java | 192 ----------- .../service/StreamingKafkaSpecExecutorTest.java | 191 +++++++++++ .../gobblin/runtime/api/SpecCompiler.java | 10 +- .../gobblin/runtime/api/TopologySpec.java | 66 ++-- .../job_monitor/AvroJobSpecKafkaJobMonitor.java | 20 +- .../AbstractSpecExecutor.java | 188 +++++++++++ .../BaseServiceNodeImpl.java | 100 ++++++ .../InMemorySpecExecutor.java | 93 ++++++ .../InMemorySpecExecutorInstanceProducer.java | 147 --------- .../InMemorySpecProducer.java | 82 +++++ .../gobblin/spec_catalog/FlowCatalogTest.java | 8 +- .../spec_catalog/TopologyCatalogTest.java | 10 +- gobblin-service/build.gradle | 1 + .../org/apache/gobblin/service/HelixUtils.java | 110 ------- .../gobblin/service/ServiceConfigKeys.java | 80 ----- .../modules/core/GobblinServiceManager.java | 2 +- .../modules/flow/BaseFlowToJobSpecCompiler.java | 259 +++++++++++++++ .../service/modules/flow/FlowEdgeProps.java | 67 ++++ .../flow/IdentityFlowToJobSpecCompiler.java | 192 ++--------- .../modules/flow/LoadBasedFlowEdgeImpl.java | 180 ++++++++++ .../flow/MultiHopsFlowToJobSpecCompiler.java | 313 ++++++++++++++++++ .../modules/orchestration/Orchestrator.java | 27 +- .../service/modules/policy/ServicePolicy.java | 51 +++ .../modules/policy/StaticServicePolicy.java | 98 ++++++ .../scheduler/GobblinServiceJobScheduler.java | 2 +- .../ConfigBasedTopologySpecFactory.java | 27 +- .../service/modules/utils/DistancedNode.java | 77 +++++ .../service/modules/utils/FindPathUtils.java | 109 +++++++ .../service/modules/utils/HelixUtils.java | 110 +++++++ .../modules/core/GobblinServiceHATest.java | 9 +- .../modules/core/GobblinServiceManagerTest.java | 6 +- .../core/IdentityFlowToJobSpecCompilerTest.java | 21 +- .../MultiHopsFlowToJobSpecCompilerTest.java | 326 +++++++++++++++++++ .../modules/orchestration/OrchestratorTest.java | 31 +- .../ConfigBasedTopologySpecFactoryTest.java | 10 +- gradle/scripts/dependencyDefinitions.gradle | 1 + 63 files changed, 3921 insertions(+), 2201 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/conf/service/application.conf ---------------------------------------------------------------------- diff --git a/conf/service/application.conf b/conf/service/application.conf index 3e292a0..3cb5b34 100644 --- a/conf/service/application.conf +++ b/conf/service/application.conf @@ -25,7 +25,7 @@ topologySpecFactory.topologyNames=localGobblinCluster topologySpecFactory.localGobblinCluster.description="StandaloneClusterTopology" topologySpecFactory.localGobblinCluster.version="1" topologySpecFactory.localGobblinCluster.uri="gobblinCluster" -topologySpecFactory.localGobblinCluster.specExecutorInstanceProducer.class="org.apache.gobblin.service.SimpleKafkaSpecExecutorInstanceProducer" +topologySpecFactory.localGobblinCluster.specExecutorInstance.class="org.apache.gobblin.service.SimpleKafkaSpecProducer" topologySpecFactory.localGobblinCluster.specExecInstance.capabilities="externalSource:InternalSink" topologySpecFactory.localGobblinCluster.writer.kafka.topics="SimpleKafkaSpecExecutorInstanceTest" topologySpecFactory.localGobblinCluster.writer.kafka.producerConfig.bootstrap.servers="localhost:9092" http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/FlowEdge.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/FlowEdge.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/FlowEdge.java new file mode 100644 index 0000000..9dc6413 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/FlowEdge.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import com.typesafe.config.Config; + + +/** + * A typical edge consists of two types of attributes: + * - Numerical value based: Return an numerical value for evaluation. + * - Boolean value based: Return either true or false. + */ +public interface FlowEdge { + + /** + * @return Uniqueness of an edge is defined by + * - sourceNode + * - targetNode + * - SpecExecutor + * hashCode and equals is required to implemented accordingly. + */ + String getEdgeIdentity(); + + /** + * Return read-only Edge Properties . + * @return + */ + Config getEdgeProperties(); + + /** + * @return If a edge should be considered as part of flow spec compilation result, + * based on all boolean-based properties like safety. + */ + boolean isEdgeEnabled(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/ServiceNode.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/ServiceNode.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/ServiceNode.java new file mode 100644 index 0000000..eeb74c4 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/ServiceNode.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import com.typesafe.config.Config; + +/** + * Abstraction of a Node in {@link SpecExecutor} + * 'Service' here refers to 'Service' in GaaS and it is not necessary related to a Service interface. + */ +public interface ServiceNode { + /** + * @return The name of node. + * It should be the identifier of a {@link ServiceNode}. + */ + String getNodeName(); + + /** + * @return The attributes of a {@link ServiceNode}. + */ + Config getNodeProps(); + + /** + * @return if the node is valid to use + */ + boolean isNodeEnabled(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java new file mode 100644 index 0000000..6dc16f4 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecConsumer.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.util.List; +import java.util.concurrent.Future; +import org.apache.commons.lang3.tuple.Pair; + + +/** + * A communication socket (receiving side for this class) + * for each {@link SpecExecutor} to receive spec to execute from Orchestrator. + * Implementation of this interface should specify communication channel (e.g. Kafka, REST, etc.) + */ +public interface SpecConsumer<V> { + + /** List of newly changed {@link Spec}s for execution on {@link SpecExecutor}. */ + Future<? extends List<Pair<SpecExecutor.Verb, V>>> changedSpecs(); + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java new file mode 100644 index 0000000..cb5197a --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutor.java @@ -0,0 +1,79 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.net.URI; +import java.util.Map; +import java.util.concurrent.Future; + +import com.typesafe.config.Config; + + +/** + * Defines a representation of JobSpec-Executor in GaaS. + * A triplet of <Technology, location, communication mechanism> uniquely defines an object of SpecExecutor. + * e.g. <Lumos, Holdem, Rest> represents a Executor that moves data by Lumos, running on Holdem can be reached by Rest. + */ +public interface SpecExecutor { + /** An URI identifying the SpecExecutor. */ + URI getUri(); + + /** Human-readable description of the SpecExecutor .*/ + Future<String> getDescription(); + + /** SpecExecutor config as a typesafe config object. */ + Future<Config> getConfig(); + + /** SpecExecutor attributes include Location of SpecExecutor and the Type of it (Technology it used for data movement, + * like, gobblin-standalone/gobblin-cluster + * SpecExecutor attributes are supposed to be read-only once instantiated. + * */ + Config getAttrs(); + + /** Health of SpecExecutor. */ + Future<String> getHealth(); + + /** Source : Destination processing capabilities of SpecExecutor. */ + Future<? extends Map<ServiceNode, ServiceNode>> getCapabilities(); + + /** A communication socket for generating spec to assigned physical executors, paired with + * a consumer on the physical executor side. */ + Future<? extends SpecProducer> getProducer(); + + public static enum Verb { + ADD(1, "add"), + UPDATE(2, "update"), + DELETE(3, "delete"); + + private int _id; + private String _verb; + + Verb(int id, String verb) { + _id = id; + _verb = verb; + } + + public int getId() { + return _id; + } + + public String getVerb() { + return _verb; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstance.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstance.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstance.java deleted file mode 100644 index 47569f3..0000000 --- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstance.java +++ /dev/null @@ -1,71 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.runtime.api; - -import java.net.URI; -import java.util.Map; -import java.util.concurrent.Future; - -import com.typesafe.config.Config; - -import org.apache.gobblin.annotation.Alpha; - - -/** - * Defines a SpecExecutorInstance (typically a standalone instance, cluster or Azkaban deployment) - * that can execute a {@link Spec}. - */ -@Alpha -public interface SpecExecutorInstance { - /** An URI identifying the SpecExecutorInstance. */ - URI getUri(); - - /** Human-readable description of the SpecExecutorInstance .*/ - Future<String> getDescription(); - - /** SpecExecutorInstance config as a typesafe config object. */ - Future<Config> getConfig(); - - /** Health of SpecExecutorInstance. */ - Future<String> getHealth(); - - /** Source : Destination processing capabilities of SpecExecutorInstance. */ - Future<? extends Map<String, String>> getCapabilities(); - - public static enum Verb { - ADD(1, "add"), - UPDATE(2, "update"), - DELETE(3, "delete"); - - private int _id; - private String _verb; - - Verb(int id, String verb) { - _id = id; - _verb = verb; - } - - public int getId() { - return _id; - } - - public String getVerb() { - return _verb; - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceConsumer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceConsumer.java deleted file mode 100644 index 475c5af..0000000 --- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceConsumer.java +++ /dev/null @@ -1,30 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.runtime.api; - -import java.util.List; -import java.util.concurrent.Future; -import org.apache.commons.lang3.tuple.Pair; - - -public interface SpecExecutorInstanceConsumer<V> extends SpecExecutorInstance { - - /** List of newly changed {@link Spec}s for execution on {@link SpecExecutorInstance}. */ - Future<? extends List<Pair<Verb, V>>> changedSpecs(); - -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceProducer.java deleted file mode 100644 index 12508da..0000000 --- a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecExecutorInstanceProducer.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.gobblin.runtime.api; - -import java.net.URI; -import java.util.List; -import java.util.concurrent.Future; - -import org.apache.gobblin.annotation.Alpha; - - -/** - * Defines a SpecExecutorInstanceProducer to produce jobs to {@link SpecExecutorInstance} - * that can execute a {@link Spec}. - */ -@Alpha -public interface SpecExecutorInstanceProducer<V> extends SpecExecutorInstance { - /** Add a {@link Spec} for execution on {@link SpecExecutorInstance}. */ - Future<?> addSpec(V addedSpec); - - /** Update a {@link Spec} being executed on {@link SpecExecutorInstance}. */ - Future<?> updateSpec(V updatedSpec); - - /** Delete a {@link Spec} being executed on {@link SpecExecutorInstance}. */ - Future<?> deleteSpec(URI deletedSpecURI); - - /** List all {@link Spec} being executed on {@link SpecExecutorInstance}. */ - Future<? extends List<V>> listSpecs(); -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java new file mode 100644 index 0000000..9b9e504 --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/runtime/api/SpecProducer.java @@ -0,0 +1,46 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.runtime.api; + +import java.net.URI; +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.gobblin.annotation.Alpha; + + +/** + * Defines a SpecProducer to produce jobs to {@link SpecExecutor} + * that can execute a {@link Spec}. + * + * A handle on the Orchestrator side to send {@link Spec}s. + */ +@Alpha +public interface SpecProducer<V> { + /** Add a {@link Spec} for execution on {@link SpecExecutor}. */ + Future<?> addSpec(V addedSpec); + + /** Update a {@link Spec} being executed on {@link SpecExecutor}. */ + Future<?> updateSpec(V updatedSpec); + + /** Delete a {@link Spec} being executed on {@link SpecExecutor}. */ + Future<?> deleteSpec(URI deletedSpecURI); + + /** List all {@link Spec} being executed on {@link SpecExecutor}. */ + Future<? extends List<V>> listSpecs(); +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java new file mode 100644 index 0000000..7231e0c --- /dev/null +++ b/gobblin-api/src/main/java/org/apache/gobblin/service/ServiceConfigKeys.java @@ -0,0 +1,109 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import org.apache.gobblin.annotation.Alpha; + +@Alpha +public class ServiceConfigKeys { + + private static final String GOBBLIN_SERVICE_PREFIX = "gobblin.service."; + + // Gobblin Service Manager Keys + public static final String GOBBLIN_SERVICE_TOPOLOGY_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologyCatalog.enabled"; + public static final String GOBBLIN_SERVICE_FLOW_CATALOG_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "flowCatalog.enabled"; + public static final String GOBBLIN_SERVICE_SCHEDULER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "scheduler.enabled"; + public static final String GOBBLIN_SERVICE_ORCHESTRATOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "orchestrator.enabled"; + public static final String GOBBLIN_SERVICE_RESTLI_SERVER_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "restliServer.enabled"; + public static final String GOBBLIN_SERVICE_TOPOLOGY_SPEC_FACTORY_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "topologySpecFactory.enabled"; + public static final String GOBBLIN_SERVICE_GIT_CONFIG_MONITOR_ENABLED_KEY = GOBBLIN_SERVICE_PREFIX + "gitConfigMonitor.enabled"; + + // Helix / ServiceScheduler Keys + public static final String HELIX_CLUSTER_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helix.cluster.name"; + public static final String ZK_CONNECTION_STRING_KEY = GOBBLIN_SERVICE_PREFIX + "zk.connection.string"; + public static final String HELIX_INSTANCE_NAME_OPTION_NAME = "helix_instance_name"; + public static final String HELIX_INSTANCE_NAME_KEY = GOBBLIN_SERVICE_PREFIX + "helixInstanceName"; + public static final String GOBBLIN_SERVICE_FLOWSPEC = GOBBLIN_SERVICE_PREFIX + "flowSpec"; + + // Helix message sub types for FlowSpec + public static final String HELIX_FLOWSPEC_ADD = "FLOWSPEC_ADD"; + public static final String HELIX_FLOWSPEC_REMOVE = "FLOWSPEC_REMOVE"; + public static final String HELIX_FLOWSPEC_UPDATE = "FLOWSPEC_UPDATE"; + + // Flow Compiler Keys + public static final String GOBBLIN_SERVICE_FLOWCOMPILER_CLASS_KEY = GOBBLIN_SERVICE_PREFIX + "flowCompiler.class"; + /** + * Directly use canonical class name here to avoid introducing additional dependency here. + */ + public static final String DEFAULT_GOBBLIN_SERVICE_FLOWCOMPILER_CLASS = + "org.apache.gobblin.service.modules.flow.IdentityFlowToJobSpecCompiler"; + + // Flow specific Keys + public static final String FLOW_SOURCE_IDENTIFIER_KEY = "gobblin.flow.sourceIdentifier"; + public static final String FLOW_DESTINATION_IDENTIFIER_KEY = "gobblin.flow.destinationIdentifier"; + + // Command line options + public static final String SERVICE_NAME_OPTION_NAME = "service_name"; + + // Topology Factory Keys (for overall factory) + public static final String TOPOLOGY_FACTORY_PREFIX = "topologySpecFactory."; + public static final String DEFAULT_TOPOLOGY_SPEC_FACTORY = + "org.apache.gobblin.service.modules.topology.ConfigBasedTopologySpecFactory"; + public static final String TOPOLOGYSPEC_FACTORY_KEY = TOPOLOGY_FACTORY_PREFIX + "class"; + public static final String TOPOLOGY_FACTORY_TOPOLOGY_NAMES_KEY = TOPOLOGY_FACTORY_PREFIX + "topologyNames"; + + // Topology Factory Keys (for individual topologies) + public static final String TOPOLOGYSPEC_DESCRIPTION_KEY = "description"; + public static final String TOPOLOGYSPEC_VERSION_KEY = "version"; + public static final String TOPOLOGYSPEC_URI_KEY = "uri"; + + public static final String DEFAULT_SPEC_EXECUTOR = + "org.apache.gobblin.runtime.spec_executorInstance.InMemorySpecExecutor"; + public static final String SPEC_EXECUTOR_KEY = "specExecutorInstance.class"; + public static final String EDGE_SECURITY_KEY = "edge.secured"; + + + // Template Catalog Keys + public static final String TEMPLATE_CATALOGS_FULLY_QUALIFIED_PATH_KEY = GOBBLIN_SERVICE_PREFIX + "templateCatalogs.fullyQualifiedPath"; + + // Keys related to user-specified policy on route selection. + // Undesired connection to form an executable JobSpec. + // Formatted as a String list, each entry contains a string in the format of "Source1:Sink1:URI", + // which indicates that data movement from source1 to sink1 with specific URI of specExecutor should be avoided. + public static final String POLICY_BASED_BLOCKED_CONNECTION = GOBBLIN_SERVICE_PREFIX + "blockedConnections"; + + // Comma separated list of nodes that is blacklisted. Names put here will become the nodeName which is the ID of a serviceNode. + public static final String POLICY_BASED_BLOCKED_NODES = GOBBLIN_SERVICE_PREFIX + "blockedNodes"; + // Complete path of how the data movement is executed from source to sink. + // Formatted as a String, each hop separated by comma, from source to sink in order. + public static final String POLICY_BASED_DATA_MOVEMENT_PATH = GOBBLIN_SERVICE_PREFIX + "fullDataPath"; + + public static final String ATTRS_PATH_IN_CONFIG = "executorAttrs"; + + // Gobblin Service Graph Representation Topology related Keys + public static final String NODE_SECURITY_KEY = "node.secured"; + // True means node is by default secure. + public static final String DEFAULT_NODE_SECURITY = "true"; + + + // Policy related configuration Keys + public static final String DEFAULT_SERVICE_POLICY = "static"; + public static final String SERVICE_POLICY_NAME = GOBBLIN_SERVICE_PREFIX + "servicePolicy"; + // Logging + public static final String GOBBLIN_SERVICE_LOG4J_CONFIGURATION_FILE = "log4j-service.properties"; +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java index e8dfb1d..ea75dc3 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/GobblinClusterConfigurationKeys.java @@ -68,11 +68,11 @@ public class GobblinClusterConfigurationKeys { public static final String JOB_CONFIGURATION_MANAGER_KEY = GOBBLIN_CLUSTER_PREFIX + "job.configuration.manager"; public static final String JOB_SPEC_REFRESH_INTERVAL = GOBBLIN_CLUSTER_PREFIX + "job.spec.refresh.interval"; - public static final String SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS_KEY = GOBBLIN_CLUSTER_PREFIX + "specConsumer.class"; - public static final String DEFAULT_SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS = - "org.apache.gobblin.service.SimpleKafkaSpecExecutorInstanceConsumer"; - public static final String DEFAULT_STREAMING_SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS = - "org.apache.gobblin.service.StreamingKafkaSpecExecutorInstanceConsumer"; + public static final String SPEC_CONSUMER_CLASS_KEY = GOBBLIN_CLUSTER_PREFIX + "specConsumer.class"; + public static final String DEFAULT_SPEC_CONSUMER_CLASS = + "org.apache.gobblin.service.SimpleKafkaSpecConsumer"; + public static final String DEFAULT_STREAMING_SPEC_CONSUMER_CLASS = + "org.apache.gobblin.service.StreamingKafkaSpecConsumer"; public static final String JOB_CATALOG_KEY = GOBBLIN_CLUSTER_PREFIX + "job.catalog"; public static final String DEFAULT_JOB_CATALOG = "org.apache.gobblin.runtime.job_catalog.NonObservingFSJobCatalog"; @@ -80,4 +80,4 @@ public class GobblinClusterConfigurationKeys { public static final String STOP_TIMEOUT_SECONDS = GOBBLIN_CLUSTER_PREFIX + "stopTimeoutSeconds"; public static final long DEFAULT_STOP_TIMEOUT_SECONDS = 60; -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java index 9290b5a..0f2d356 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/ScheduledJobConfigurationManager.java @@ -27,24 +27,23 @@ import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.reflect.ConstructorUtils; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.base.Optional; import com.google.common.collect.Maps; import com.google.common.eventbus.EventBus; import com.typesafe.config.Config; +import org.apache.commons.lang3.reflect.ConstructorUtils; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstance; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceConsumer; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExecutorsUtils; +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.runtime.api.SpecExecutor; @Alpha @@ -59,9 +58,9 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager { private final ScheduledExecutorService fetchJobSpecExecutor; - private final SpecExecutorInstanceConsumer specExecutorInstanceConsumer; + private final SpecConsumer _specConsumer; - private final ClassAliasResolver<SpecExecutorInstanceConsumer> aliasResolver; + private final ClassAliasResolver<SpecConsumer> aliasResolver; public ScheduledJobConfigurationManager(EventBus eventBus, Config config) { super(eventBus, config); @@ -73,17 +72,17 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager { this.fetchJobSpecExecutor = Executors.newSingleThreadScheduledExecutor( ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobSpecExecutor"))); - this.aliasResolver = new ClassAliasResolver<>(SpecExecutorInstanceConsumer.class); + this.aliasResolver = new ClassAliasResolver<>(SpecConsumer.class); try { - String specExecutorInstanceConsumerClassName = GobblinClusterConfigurationKeys.DEFAULT_SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS; - if (config.hasPath(GobblinClusterConfigurationKeys.SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS_KEY)) { - specExecutorInstanceConsumerClassName = config.getString(GobblinClusterConfigurationKeys.SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS_KEY); + String specConsumerClassName = GobblinClusterConfigurationKeys.DEFAULT_SPEC_CONSUMER_CLASS; + if (config.hasPath(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY)) { + specConsumerClassName = config.getString(GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY); } - LOGGER.info("Using SpecExecutorInstanceConsumer ClassNameclass name/alias " + specExecutorInstanceConsumerClassName); - this.specExecutorInstanceConsumer = (SpecExecutorInstanceConsumer) ConstructorUtils - .invokeConstructor(Class.forName(this.aliasResolver.resolve( specExecutorInstanceConsumerClassName)), config); + LOGGER.info("Using SpecConsumer ClassNameclass name/alias " + specConsumerClassName); + this._specConsumer = (SpecConsumer) ConstructorUtils + .invokeConstructor(Class.forName(this.aliasResolver.resolve(specConsumerClassName)), config); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException - | ClassNotFoundException e) { + | ClassNotFoundException e) { throw new RuntimeException(e); } } @@ -116,25 +115,25 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager { * @throws InterruptedException */ private void fetchJobSpecs() throws ExecutionException, InterruptedException { - List<Pair<SpecExecutorInstance.Verb, Spec>> changesSpecs = - (List<Pair<SpecExecutorInstance.Verb, Spec>>) this.specExecutorInstanceConsumer.changedSpecs().get(); + List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = + (List<Pair<SpecExecutor.Verb, Spec>>) this._specConsumer.changedSpecs().get(); - for (Pair<SpecExecutorInstance.Verb, Spec> entry : changesSpecs) { + for (Pair<SpecExecutor.Verb, Spec> entry : changesSpecs) { - SpecExecutorInstance.Verb verb = entry.getKey(); - if (verb.equals(SpecExecutorInstance.Verb.ADD)) { + SpecExecutor.Verb verb = entry.getKey(); + if (verb.equals(SpecExecutor.Verb.ADD)) { // Handle addition JobSpec jobSpec = (JobSpec) entry.getValue(); postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties()); jobSpecs.put(entry.getValue().getUri(), (JobSpec) entry.getValue()); - } else if (verb.equals(SpecExecutorInstanceConsumer.Verb.UPDATE)) { + } else if (verb.equals(SpecExecutor.Verb.UPDATE)) { // Handle update JobSpec jobSpec = (JobSpec) entry.getValue(); postUpdateJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties()); jobSpecs.put(entry.getValue().getUri(), (JobSpec) entry.getValue()); - } else if (verb.equals(SpecExecutorInstanceConsumer.Verb.DELETE)) { + } else if (verb.equals(SpecExecutor.Verb.DELETE)) { // Handle delete Spec anonymousSpec = (Spec) entry.getValue(); @@ -148,4 +147,4 @@ public class ScheduledJobConfigurationManager extends JobConfigurationManager { protected void shutDown() throws Exception { ExecutorsUtils.shutdownExecutorService(this.fetchJobSpecExecutor, Optional.of(LOGGER)); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java ---------------------------------------------------------------------- diff --git a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java index e660710..7370dc6 100644 --- a/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java +++ b/gobblin-cluster/src/main/java/org/apache/gobblin/cluster/StreamingJobConfigurationManager.java @@ -25,30 +25,29 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import org.apache.commons.lang3.tuple.Pair; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import com.google.common.base.Optional; import com.google.common.collect.ImmutableList; import com.google.common.eventbus.EventBus; import com.google.common.util.concurrent.Service; import com.typesafe.config.Config; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.apache.gobblin.annotation.Alpha; import org.apache.gobblin.runtime.api.JobSpec; import org.apache.gobblin.runtime.api.MutableJobCatalog; import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstance; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceConsumer; +import org.apache.gobblin.runtime.api.SpecExecutor; import org.apache.gobblin.util.ClassAliasResolver; import org.apache.gobblin.util.ConfigUtils; import org.apache.gobblin.util.ExecutorsUtils; import org.apache.gobblin.util.reflection.GobblinConstructorUtils; +import org.apache.gobblin.runtime.api.SpecConsumer; /** - * A {@link JobConfigurationManager} that fetches job specs from a {@link SpecExecutorInstanceConsumer} in a loop + * A {@link JobConfigurationManager} that fetches job specs from a {@link SpecConsumer} in a loop * without */ @Alpha @@ -57,7 +56,7 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager { private final ExecutorService fetchJobSpecExecutor; - private final SpecExecutorInstanceConsumer specExecutorInstanceConsumer; + private final SpecConsumer specConsumer; private final long stopTimeoutSeconds; @@ -71,23 +70,23 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager { ExecutorsUtils.newThreadFactory(Optional.of(LOGGER), Optional.of("FetchJobSpecExecutor"))); String specExecutorInstanceConsumerClassName = - ConfigUtils.getString(config, GobblinClusterConfigurationKeys.SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS_KEY, - GobblinClusterConfigurationKeys.DEFAULT_STREAMING_SPEC_EXECUTOR_INSTANCE_CONSUMER_CLASS); + ConfigUtils.getString(config, GobblinClusterConfigurationKeys.SPEC_CONSUMER_CLASS_KEY, + GobblinClusterConfigurationKeys.DEFAULT_STREAMING_SPEC_CONSUMER_CLASS); - LOGGER.info("Using SpecExecutorInstanceConsumer ClassNameclass name/alias " + + LOGGER.info("Using SpecConsumer ClassNameclass name/alias " + specExecutorInstanceConsumerClassName); try { - ClassAliasResolver<SpecExecutorInstanceConsumer> aliasResolver = - new ClassAliasResolver<>(SpecExecutorInstanceConsumer.class); + ClassAliasResolver<SpecConsumer> aliasResolver = + new ClassAliasResolver<>(SpecConsumer.class); - this.specExecutorInstanceConsumer = (SpecExecutorInstanceConsumer) GobblinConstructorUtils.invokeFirstConstructor( + this.specConsumer = (SpecConsumer) GobblinConstructorUtils.invokeFirstConstructor( Class.forName(aliasResolver.resolve(specExecutorInstanceConsumerClassName)), ImmutableList.<Object>of(config, jobCatalog), ImmutableList.<Object>of(config)); } catch (NoSuchMethodException | IllegalAccessException | InvocationTargetException | InstantiationException - | ClassNotFoundException e) { - throw new RuntimeException("Could not construct SpecExecutorInstanceConsumer " + + | ClassNotFoundException e) { + throw new RuntimeException("Could not construct SpecConsumer " + specExecutorInstanceConsumerClassName, e); } } @@ -97,8 +96,8 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager { LOGGER.info("Starting the " + StreamingJobConfigurationManager.class.getSimpleName()); // if the instance consumer is a service then need to start it to consume job specs - if (this.specExecutorInstanceConsumer instanceof Service) { - ((Service) this.specExecutorInstanceConsumer).startAsync().awaitRunning(); + if (this.specConsumer instanceof Service) { + ((Service) this.specConsumer).startAsync().awaitRunning(); } // submit command to fetch job specs @@ -120,25 +119,25 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager { } private void fetchJobSpecs() throws ExecutionException, InterruptedException { - List<Pair<SpecExecutorInstance.Verb, Spec>> changesSpecs = - (List<Pair<SpecExecutorInstance.Verb, Spec>>) this.specExecutorInstanceConsumer.changedSpecs().get(); + List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = + (List<Pair<SpecExecutor.Verb, Spec>>) this.specConsumer.changedSpecs().get(); // propagate thread interruption so that caller will exit from loop if (Thread.interrupted()) { throw new InterruptedException(); } - for (Pair<SpecExecutorInstance.Verb, Spec> entry : changesSpecs) { - SpecExecutorInstance.Verb verb = entry.getKey(); - if (verb.equals(SpecExecutorInstance.Verb.ADD)) { + for (Pair<SpecExecutor.Verb, Spec> entry : changesSpecs) { + SpecExecutor.Verb verb = entry.getKey(); + if (verb.equals(SpecExecutor.Verb.ADD)) { // Handle addition JobSpec jobSpec = (JobSpec) entry.getValue(); postNewJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties()); - } else if (verb.equals(SpecExecutorInstanceConsumer.Verb.UPDATE)) { + } else if (verb.equals(SpecExecutor.Verb.UPDATE)) { // Handle update JobSpec jobSpec = (JobSpec) entry.getValue(); postUpdateJobConfigArrival(jobSpec.getUri().toString(), jobSpec.getConfigAsProperties()); - } else if (verb.equals(SpecExecutorInstanceConsumer.Verb.DELETE)) { + } else if (verb.equals(SpecExecutor.Verb.DELETE)) { // Handle delete Spec anonymousSpec = (Spec) entry.getValue(); postDeleteJobConfigArrival(anonymousSpec.getUri().toString(), new Properties()); @@ -148,11 +147,11 @@ public class StreamingJobConfigurationManager extends JobConfigurationManager { @Override protected void shutDown() throws Exception { - if (this.specExecutorInstanceConsumer instanceof Service) { - ((Service) this.specExecutorInstanceConsumer).stopAsync().awaitTerminated(this.stopTimeoutSeconds, + if (this.specConsumer instanceof Service) { + ((Service) this.specConsumer).stopAsync().awaitTerminated(this.stopTimeoutSeconds, TimeUnit.SECONDS); } ExecutorsUtils.shutdownExecutorService(this.fetchJobSpecExecutor, Optional.of(LOGGER)); } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java new file mode 100644 index 0000000..2e06817 --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutor.java @@ -0,0 +1,76 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.util.concurrent.Future; + +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; +import org.apache.gobblin.util.CompletedFuture; +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; +import com.typesafe.config.ConfigFactory; + + +public class AzkabanSpecExecutor extends AbstractSpecExecutor { + + // Executor Instance + protected final Config _config; + + private SpecProducer<Spec> azkabanSpecProducer; + + public AzkabanSpecExecutor(Config config, Optional<Logger> log) { + super(config, log); + Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE); + _config = config.withFallback(defaultConfig); + azkabanSpecProducer = new AzkabanSpecProducer(_config, log); + } + + @Override + public Future<String> getDescription() { + return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null); + } + + + @Override + public Future<? extends SpecProducer> getProducer() { + return new CompletedFuture<>(this.azkabanSpecProducer, null); + } + + @Override + public Future<Config> getConfig() { + return new CompletedFuture<>(_config, null); + } + + @Override + public Future<String> getHealth() { + return new CompletedFuture<>("Healthy", null); + } + + @Override + protected void startUp() throws Exception { + // nothing to do in default implementation + } + + @Override + protected void shutDown() throws Exception { + // nothing to do in default implementation + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java deleted file mode 100644 index dcc89cc..0000000 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstance.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gobblin.service.modules.orchestration; - -import java.net.URI; -import java.net.URISyntaxException; -import java.util.List; -import java.util.Map; -import java.util.concurrent.Future; - -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.runtime.api.SpecExecutorInstance; -import org.apache.gobblin.util.CompletedFuture; -import org.apache.gobblin.util.ConfigUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Optional; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Maps; -import com.google.common.util.concurrent.AbstractIdleService; -import com.typesafe.config.Config; -import com.typesafe.config.ConfigFactory; - - -public class AzkabanSpecExecutorInstance extends AbstractIdleService implements SpecExecutorInstance { - protected static final Splitter SPLIT_BY_COMMA = Splitter.on(",").omitEmptyStrings().trimResults(); - protected static final Splitter SPLIT_BY_COLON = Splitter.on(":").omitEmptyStrings().trimResults(); - - // Executor Instance - protected final Config _config; - protected final Logger _log; - protected final URI _specExecutorInstanceUri; - protected final Map<String, String> _capabilities; - - public AzkabanSpecExecutorInstance(Config config, Optional<Logger> log) { - Config defaultConfig = ConfigFactory.load(ServiceAzkabanConfigKeys.DEFAULT_AZKABAN_PROJECT_CONFIG_FILE); - _config = config.withFallback(defaultConfig); - _log = log.isPresent() ? log.get() : LoggerFactory.getLogger(getClass()); - try { - _specExecutorInstanceUri = new URI(ConfigUtils.getString(config, ConfigurationKeys.SPECEXECUTOR_INSTANCE_URI_KEY, - "NA")); - } catch (URISyntaxException e) { - throw new RuntimeException(e); - } - _capabilities = Maps.newHashMap(); - if (config.hasPath(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY)) { - String capabilitiesStr = config.getString(ConfigurationKeys.SPECEXECUTOR_INSTANCE_CAPABILITIES_KEY); - List<String> capabilities = SPLIT_BY_COMMA.splitToList(capabilitiesStr); - for (String capability : capabilities) { - List<String> currentCapability = SPLIT_BY_COLON.splitToList(capability); - Preconditions.checkArgument(currentCapability.size() == 2, "Only one source:destination pair is supported " - + "per capability, found: " + currentCapability); - _capabilities.put(currentCapability.get(0), currentCapability.get(1)); - } - } - } - - @Override - public URI getUri() { - return _specExecutorInstanceUri; - } - - @Override - public Future<String> getDescription() { - return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + _specExecutorInstanceUri, null); - } - - @Override - public Future<Config> getConfig() { - return new CompletedFuture<>(_config, null); - } - - @Override - public Future<String> getHealth() { - return new CompletedFuture<>("Healthy", null); - } - - @Override - public Future<? extends Map<String, String>> getCapabilities() { - return new CompletedFuture<>(_capabilities, null); - } - - @Override - protected void startUp() throws Exception { - // nothing to do in default implementation - } - - @Override - protected void shutDown() throws Exception { - // nothing to do in default implementation - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java deleted file mode 100644 index 47df250..0000000 --- a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecExecutorInstanceProducer.java +++ /dev/null @@ -1,176 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.gobblin.service.modules.orchestration; - -import java.io.Closeable; -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.concurrent.Future; - -import org.apache.commons.codec.EncoderException; -import org.apache.commons.lang3.StringUtils; -import org.apache.gobblin.runtime.api.JobSpec; -import org.apache.gobblin.runtime.api.Spec; -import org.apache.gobblin.runtime.api.SpecExecutorInstanceProducer; -import org.apache.gobblin.util.CompletedFuture; -import org.apache.gobblin.util.ConfigUtils; -import org.slf4j.Logger; - -import com.google.common.base.Optional; -import com.typesafe.config.Config; - - -public class AzkabanSpecExecutorInstanceProducer extends AzkabanSpecExecutorInstance - implements SpecExecutorInstanceProducer<Spec>, Closeable { - - // Session Id for GaaS User - private String _sessionId; - - - public AzkabanSpecExecutorInstanceProducer(Config config, Optional<Logger> log) { - super(config, log); - - try { - // Initialize Azkaban client / producer and cache credentials - String azkabanUsername = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY); - String azkabanPassword = getAzkabanPassword(_config); - String azkabanServerUrl = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY); - - _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl); - } catch (IOException | EncoderException e) { - throw new RuntimeException("Could not authenticate with Azkaban", e); - } - } - - private String getAzkabanPassword(Config config) { - if (StringUtils.isNotBlank(System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY))) { - return System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY); - } - - return ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY, StringUtils.EMPTY); - } - - public AzkabanSpecExecutorInstanceProducer(Config config, Logger log) { - this(config, Optional.of(log)); - } - - /** Constructor with no logging */ - public AzkabanSpecExecutorInstanceProducer(Config config) { - this(config, Optional.<Logger>absent()); - } - - @Override - public void close() throws IOException { - - } - - @Override - public Future<?> addSpec(Spec addedSpec) { - // If project already exists, execute it - try { - AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec); - boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig); - - // If project does not already exists, create and execute it - if (azkabanProjectExists) { - _log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName()); - AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig), - azkabanProjectConfig); - } else { - _log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName()); - - // Deleted project also returns true if-project-exists check, so optimistically first create the project - // .. (it will create project if it was never created or deleted), if project exists it will fail with - // .. appropriate exception message, catch that and run in replace project mode if force overwrite is - // .. specified - try { - createNewAzkabanProject(_sessionId, azkabanProjectConfig); - } catch (IOException e) { - if ("Project already exists.".equalsIgnoreCase(e.getMessage())) { - if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(), - ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) { - _log.info("Project already exists for this Spec, but force overwrite specified"); - updateExistingAzkabanProject(_sessionId, azkabanProjectConfig); - } else { - _log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s", - azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName())); - } - } else { - throw e; - } - } - } - - - } catch (IOException e) { - throw new RuntimeException("Issue in setting up Azkaban project.", e); - } - - return new CompletedFuture<>(_config, null); - } - - @Override - public Future<?> updateSpec(Spec updatedSpec) { - // Re-create project - AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec); - - try { - updateExistingAzkabanProject(_sessionId, azkabanProjectConfig); - } catch (IOException e) { - throw new RuntimeException("Issue in setting up Azkaban project.", e); - } - - return new CompletedFuture<>(_config, null); - } - - @Override - public Future<?> deleteSpec(URI deletedSpecURI) { - // Delete project - throw new UnsupportedOperationException(); - } - - @Override - public Future<? extends List<Spec>> listSpecs() { - throw new UnsupportedOperationException(); - } - - private void createNewAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { - // Create Azkaban Job - String azkabanProjectId = AzkabanJobHelper.createAzkabanJob(sessionId, azkabanProjectConfig); - - // Schedule Azkaban Job - AzkabanJobHelper.scheduleJob(sessionId, azkabanProjectId, azkabanProjectConfig); - - _log.info(String.format("Azkaban project created: %smanager?project=%s", - azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName())); - } - - private void updateExistingAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { - _log.info(String.format("Updating project: %smanager?project=%s", azkabanProjectConfig.getAzkabanServerUrl(), - azkabanProjectConfig.getAzkabanProjectName())); - - // Get project Id - String azkabanProjectId = AzkabanJobHelper.getProjectId(sessionId, azkabanProjectConfig); - - // Replace Azkaban Job - AzkabanJobHelper.replaceAzkabanJob(sessionId, azkabanProjectId, azkabanProjectConfig); - - // Change schedule - AzkabanJobHelper.changeJobSchedule(sessionId, azkabanProjectId, azkabanProjectConfig); - } -} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java new file mode 100644 index 0000000..5a491ab --- /dev/null +++ b/gobblin-modules/gobblin-azkaban/src/main/java/org/apache/gobblin/service/modules/orchestration/AzkabanSpecProducer.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.service.modules.orchestration; + +import java.io.Closeable; +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.concurrent.Future; + +import org.apache.commons.codec.EncoderException; +import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; +import org.slf4j.Logger; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; + +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public class AzkabanSpecProducer implements SpecProducer<Spec>, Closeable { + + // Session Id for GaaS User + private String _sessionId; + private Config _config; + + public AzkabanSpecProducer(Config config, Optional<Logger> log) { + this._config = config; + try { + // Initialize Azkaban client / producer and cache credentials + String azkabanUsername = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_USERNAME_KEY); + String azkabanPassword = getAzkabanPassword(_config); + String azkabanServerUrl = _config.getString(ServiceAzkabanConfigKeys.AZKABAN_SERVER_URL_KEY); + + _sessionId = AzkabanAjaxAPIClient.authenticateAndGetSessionId(azkabanUsername, azkabanPassword, azkabanServerUrl); + } catch (IOException | EncoderException e) { + throw new RuntimeException("Could not authenticate with Azkaban", e); + } + } + + private String getAzkabanPassword(Config config) { + if (StringUtils.isNotBlank(System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY))) { + return System.getProperty(ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_SYSTEM_KEY); + } + + return ConfigUtils.getString(config, ServiceAzkabanConfigKeys.AZKABAN_PASSWORD_KEY, StringUtils.EMPTY); + } + + public AzkabanSpecProducer(Config config, Logger log) { + this(config, Optional.of(log)); + } + + /** Constructor with no logging */ + public AzkabanSpecProducer(Config config) { + this(config, Optional.<Logger>absent()); + } + + @Override + public void close() throws IOException { + + } + + @Override + public Future<?> addSpec(Spec addedSpec) { + // If project already exists, execute it + try { + AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) addedSpec); + boolean azkabanProjectExists = AzkabanJobHelper.isAzkabanJobPresent(_sessionId, azkabanProjectConfig); + + // If project does not already exists, create and execute it + if (azkabanProjectExists) { + log.info("Executing Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName()); + AzkabanJobHelper.executeJob(_sessionId, AzkabanJobHelper.getProjectId(_sessionId, azkabanProjectConfig), + azkabanProjectConfig); + } else { + log.info("Setting up Azkaban Project: " + azkabanProjectConfig.getAzkabanProjectName()); + + // Deleted project also returns true if-project-exists check, so optimistically first create the project + // .. (it will create project if it was never created or deleted), if project exists it will fail with + // .. appropriate exception message, catch that and run in replace project mode if force overwrite is + // .. specified + try { + createNewAzkabanProject(_sessionId, azkabanProjectConfig); + } catch (IOException e) { + if ("Project already exists.".equalsIgnoreCase(e.getMessage())) { + if (ConfigUtils.getBoolean(((JobSpec) addedSpec).getConfig(), + ServiceAzkabanConfigKeys.AZKABAN_PROJECT_OVERWRITE_IF_EXISTS_KEY, false)) { + log.info("Project already exists for this Spec, but force overwrite specified"); + updateExistingAzkabanProject(_sessionId, azkabanProjectConfig); + } else { + log.info(String.format("Azkaban project already exists: " + "%smanager?project=%s", + azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName())); + } + } else { + throw e; + } + } + } + + + } catch (IOException e) { + throw new RuntimeException("Issue in setting up Azkaban project.", e); + } + + return new CompletedFuture<>(_config, null); + } + + @Override + public Future<?> updateSpec(Spec updatedSpec) { + // Re-create project + AzkabanProjectConfig azkabanProjectConfig = new AzkabanProjectConfig((JobSpec) updatedSpec); + + try { + updateExistingAzkabanProject(_sessionId, azkabanProjectConfig); + } catch (IOException e) { + throw new RuntimeException("Issue in setting up Azkaban project.", e); + } + + return new CompletedFuture<>(_config, null); + } + + @Override + public Future<?> deleteSpec(URI deletedSpecURI) { + // Delete project + throw new UnsupportedOperationException(); + } + + @Override + public Future<? extends List<Spec>> listSpecs() { + throw new UnsupportedOperationException(); + } + + private void createNewAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { + // Create Azkaban Job + String azkabanProjectId = AzkabanJobHelper.createAzkabanJob(sessionId, azkabanProjectConfig); + + // Schedule Azkaban Job + AzkabanJobHelper.scheduleJob(sessionId, azkabanProjectId, azkabanProjectConfig); + + log.info(String.format("Azkaban project created: %smanager?project=%s", + azkabanProjectConfig.getAzkabanServerUrl(), azkabanProjectConfig.getAzkabanProjectName())); + } + + private void updateExistingAzkabanProject(String sessionId, AzkabanProjectConfig azkabanProjectConfig) throws IOException { + log.info(String.format("Updating project: %smanager?project=%s", azkabanProjectConfig.getAzkabanServerUrl(), + azkabanProjectConfig.getAzkabanProjectName())); + + // Get project Id + String azkabanProjectId = AzkabanJobHelper.getProjectId(sessionId, azkabanProjectConfig); + + // Replace Azkaban Job + AzkabanJobHelper.replaceAzkabanJob(sessionId, azkabanProjectId, azkabanProjectConfig); + + // Change schedule + AzkabanJobHelper.changeJobSchedule(sessionId, azkabanProjectId, azkabanProjectConfig); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java new file mode 100644 index 0000000..083ccf3 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecConsumer.java @@ -0,0 +1,264 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.ByteArrayInputStream; +import java.io.Closeable; +import java.io.DataInputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Properties; +import java.util.concurrent.Future; +import java.util.regex.Pattern; + +import com.google.common.base.Optional; +import com.google.common.collect.Lists; +import com.typesafe.config.Config; + +import org.apache.avro.io.BinaryDecoder; +import org.apache.avro.io.Decoder; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.specific.SpecificDatumReader; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.kafka.client.ByteArrayBasedKafkaRecord; +import org.apache.gobblin.kafka.client.DecodeableKafkaRecord; +import org.apache.gobblin.kafka.client.GobblinKafkaConsumerClient; +import org.apache.gobblin.kafka.client.Kafka08ConsumerClient; +import org.apache.gobblin.kafka.client.KafkaConsumerRecord; +import org.apache.gobblin.metrics.reporter.util.FixedSchemaVersionWriter; +import org.apache.gobblin.metrics.reporter.util.SchemaVersionWriter; +import org.apache.gobblin.runtime.api.JobSpec; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.runtime.job_spec.AvroJobSpec; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaOffsetRetrievalFailureException; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaPartition; +import org.apache.gobblin.source.extractor.extract.kafka.KafkaTopic; +import org.apache.gobblin.util.CompletedFuture; +import static org.apache.gobblin.service.SimpleKafkaSpecExecutor.*; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class SimpleKafkaSpecConsumer implements SpecConsumer<Spec>, Closeable { + + // Consumer + protected final GobblinKafkaConsumerClient _kafka08Consumer; + protected final List<KafkaPartition> _partitions; + protected final List<Long> _lowWatermark; + protected final List<Long> _nextWatermark; + protected final List<Long> _highWatermark; + + private Iterator<KafkaConsumerRecord> messageIterator = null; + private int currentPartitionIdx = -1; + private boolean isFirstRun = true; + + private final BinaryDecoder _decoder; + private final SpecificDatumReader<AvroJobSpec> _reader; + private final SchemaVersionWriter<?> _versionWriter; + + public SimpleKafkaSpecConsumer(Config config, Optional<Logger> log) { + + // Consumer + _kafka08Consumer = new Kafka08ConsumerClient.Factory().create(config); + List<KafkaTopic> kafkaTopics = _kafka08Consumer.getFilteredTopics(Collections.EMPTY_LIST, + Lists.newArrayList(Pattern.compile(config.getString(SimpleKafkaSpecExecutor.SPEC_KAFKA_TOPICS_KEY)))); + _partitions = kafkaTopics.get(0).getPartitions(); + _lowWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); + _nextWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); + _highWatermark = Lists.newArrayList(Collections.nCopies(_partitions.size(), 0L)); + + InputStream dummyInputStream = new ByteArrayInputStream(new byte[0]); + _decoder = DecoderFactory.get().binaryDecoder(dummyInputStream, null); + _reader = new SpecificDatumReader<AvroJobSpec>(AvroJobSpec.SCHEMA$); + _versionWriter = new FixedSchemaVersionWriter(); + } + + public SimpleKafkaSpecConsumer(Config config, Logger log) { + this(config, Optional.of(log)); + } + + /** Constructor with no logging */ + public SimpleKafkaSpecConsumer(Config config) { + this(config, Optional.<Logger>absent()); + } + + @Override + public Future<? extends List<Pair<SpecExecutor.Verb, Spec>>> changedSpecs() { + List<Pair<SpecExecutor.Verb, Spec>> changesSpecs = new ArrayList<>(); + initializeWatermarks(); + this.currentPartitionIdx = -1; + while (!allPartitionsFinished()) { + if (currentPartitionFinished()) { + moveToNextPartition(); + continue; + } + if (this.messageIterator == null || !this.messageIterator.hasNext()) { + try { + this.messageIterator = fetchNextMessageBuffer(); + } catch (Exception e) { + log.error(String.format("Failed to fetch next message buffer for partition %s. Will skip this partition.", + getCurrentPartition()), e); + moveToNextPartition(); + continue; + } + if (this.messageIterator == null || !this.messageIterator.hasNext()) { + moveToNextPartition(); + continue; + } + } + while (!currentPartitionFinished()) { + if (!this.messageIterator.hasNext()) { + break; + } + + KafkaConsumerRecord nextValidMessage = this.messageIterator.next(); + + // Even though we ask Kafka to give us a message buffer starting from offset x, it may + // return a buffer that starts from offset smaller than x, so we need to skip messages + // until we get to x. + if (nextValidMessage.getOffset() < _nextWatermark.get(this.currentPartitionIdx)) { + continue; + } + + _nextWatermark.set(this.currentPartitionIdx, nextValidMessage.getNextOffset()); + try { + final AvroJobSpec record; + + if (nextValidMessage instanceof ByteArrayBasedKafkaRecord) { + record = decodeRecord((ByteArrayBasedKafkaRecord)nextValidMessage); + } else if (nextValidMessage instanceof DecodeableKafkaRecord){ + record = ((DecodeableKafkaRecord<?, AvroJobSpec>) nextValidMessage).getValue(); + } else { + throw new IllegalStateException( + "Unsupported KafkaConsumerRecord type. The returned record can either be ByteArrayBasedKafkaRecord" + + " or DecodeableKafkaRecord"); + } + + JobSpec.Builder jobSpecBuilder = JobSpec.builder(record.getUri()); + + Properties props = new Properties(); + props.putAll(record.getProperties()); + jobSpecBuilder.withJobCatalogURI(record.getUri()).withVersion(record.getVersion()) + .withDescription(record.getDescription()).withConfigAsProperties(props); + + if (!record.getTemplateUri().isEmpty()) { + jobSpecBuilder.withTemplate(new URI(record.getTemplateUri())); + } + + String verbName = record.getMetadata().get(VERB_KEY); + Verb verb = Verb.valueOf(verbName); + + changesSpecs.add(new ImmutablePair<Verb, Spec>(verb, jobSpecBuilder.build())); + } catch (Throwable t) { + log.error("Could not decode record at partition " + this.currentPartitionIdx + + " offset " + nextValidMessage.getOffset()); + } + } + } + + return new CompletedFuture(changesSpecs, null); + } + + private void initializeWatermarks() { + initializeLowWatermarks(); + initializeHighWatermarks(); + } + + private void initializeLowWatermarks() { + try { + int i=0; + for (KafkaPartition kafkaPartition : _partitions) { + if (isFirstRun) { + long earliestOffset = _kafka08Consumer.getEarliestOffset(kafkaPartition); + _lowWatermark.set(i, earliestOffset); + } else { + _lowWatermark.set(i, _highWatermark.get(i)); + } + i++; + } + isFirstRun = false; + } catch (KafkaOffsetRetrievalFailureException e) { + throw new RuntimeException(e); + } + } + + private void initializeHighWatermarks() { + try { + int i=0; + for (KafkaPartition kafkaPartition : _partitions) { + long latestOffset = _kafka08Consumer.getLatestOffset(kafkaPartition); + _highWatermark.set(i, latestOffset); + i++; + } + } catch (KafkaOffsetRetrievalFailureException e) { + throw new RuntimeException(e); + } + } + + private boolean allPartitionsFinished() { + return this.currentPartitionIdx >= _nextWatermark.size(); + } + + private boolean currentPartitionFinished() { + if (this.currentPartitionIdx == -1) { + return true; + } else if (_nextWatermark.get(this.currentPartitionIdx) >= _highWatermark.get(this.currentPartitionIdx)) { + return true; + } else { + return false; + } + } + + private int moveToNextPartition() { + this.messageIterator = null; + return this.currentPartitionIdx ++; + } + + private KafkaPartition getCurrentPartition() { + return _partitions.get(this.currentPartitionIdx); + } + + private Iterator<KafkaConsumerRecord> fetchNextMessageBuffer() { + return _kafka08Consumer.consume(_partitions.get(this.currentPartitionIdx), + _nextWatermark.get(this.currentPartitionIdx), _highWatermark.get(this.currentPartitionIdx)); + } + + private AvroJobSpec decodeRecord(ByteArrayBasedKafkaRecord kafkaConsumerRecord) throws IOException { + InputStream is = new ByteArrayInputStream(kafkaConsumerRecord.getMessageBytes()); + _versionWriter.readSchemaVersioningInformation(new DataInputStream(is)); + + Decoder decoder = DecoderFactory.get().binaryDecoder(is, _decoder); + + return _reader.read(null, decoder); + } + + @Override + public void close() throws IOException { + _kafka08Consumer.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/9402a903/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java new file mode 100644 index 0000000..8545bf6 --- /dev/null +++ b/gobblin-modules/gobblin-kafka-08/src/main/java/org/apache/gobblin/service/SimpleKafkaSpecExecutor.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.service; + +import java.io.Serializable; +import java.net.URI; +import java.net.URISyntaxException; +import java.util.concurrent.Future; + +import com.google.common.base.Optional; +import com.typesafe.config.Config; +import com.google.common.io.Closer; + +import org.slf4j.Logger; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.runtime.api.Spec; +import org.apache.gobblin.util.CompletedFuture; +import org.apache.gobblin.util.ConfigUtils; +import org.apache.gobblin.runtime.api.SpecExecutor; +import org.apache.gobblin.runtime.api.SpecConsumer; +import org.apache.gobblin.runtime.api.SpecProducer; +import org.apache.gobblin.runtime.spec_executorInstance.AbstractSpecExecutor; + +/** + * An {@link SpecExecutor} that use Kafka as the communication mechanism. + */ +public class SimpleKafkaSpecExecutor extends AbstractSpecExecutor { + public static final String SPEC_KAFKA_TOPICS_KEY = "spec.kafka.topics"; + + + protected static final String VERB_KEY = "Verb"; + + private SpecProducer<Spec> specProducer; + + public SimpleKafkaSpecExecutor(Config config, Optional<Logger> log) { + super(config, log); + specProducer = new SimpleKafkaSpecProducer(config, log); + } + + /** + * Constructor with no logging, necessary for simple use case. + * @param config + */ + public SimpleKafkaSpecExecutor(Config config) { + this(config, Optional.absent()); + } + + @Override + public Future<? extends SpecProducer> getProducer() { + return new CompletedFuture<>(this.specProducer, null); + } + + @Override + public Future<String> getDescription() { + return new CompletedFuture<>("SimpleSpecExecutorInstance with URI: " + specExecutorInstanceUri, null); + } + + @Override + protected void startUp() throws Exception { + optionalCloser = Optional.of(Closer.create()); + specProducer = optionalCloser.get().register((SimpleKafkaSpecProducer) specProducer); + } + + @Override + protected void shutDown() throws Exception { + if (optionalCloser.isPresent()) { + optionalCloser.get().close(); + } else { + log.warn("There's no Closer existed in " + this.getClass().getName()); + } + } + + public static class SpecExecutorInstanceDataPacket implements Serializable { + + protected Verb _verb; + protected URI _uri; + protected Spec _spec; + + public SpecExecutorInstanceDataPacket(Verb verb, URI uri, Spec spec) { + _verb = verb; + _uri = uri; + _spec = spec; + } + + @Override + public String toString() { + return String.format("Verb: %s, URI: %s, Spec: %s", _verb, _uri, _spec); + } + } +} \ No newline at end of file
