SAMZA-1814: consolidate JobNode and JobGraph configuration generation for high and low-level API applications
High-level changes: - Move configuration generation to JobNodeConfigurationGenerator - Move the intermediate partition calculation to IntermediationStreamPartitionPlanner - Consolidate the code in JobPlanner and ExecutionPlanner for high and low-level API plan/configuration generation Author: Yi Pan (Data Infrastructure) <[email protected]> Author: Yi Pan (Data Infrastructure) <[email protected]> Author: Yi Pan (Data Infrastructure) <[email protected]> Author: Prateek Maheshwari <[email protected]> Author: Prateek Maheshwari <[email protected]> Author: prateekm <[email protected]> Reviewers: Prateek Maheshwari <[email protected]>, Cameron Lee <[email protected]> Closes #642 from nickpan47/SAMZA-1814 and squashes the following commits: 214373966 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814. With minor fixes to allow merge correctly. f8c8108ac [Yi Pan (Data Infrastructure)] SAMZA-1814: Fix merging errors. c8681a028 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814 b66b9fa9d [Yi Pan (Data Infrastructure)] SAMZA-1814: moving serde generation to a single top-level configuration generation, not embedded in table. Address review comments 0db5068dd [Yi Pan (Data Infrastructure)] SAMZA-1814: fix merge issue and consolidated some test classes 2c856c5f5 [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate configuration generation for high and low-level APIs ffc6f1a70 [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate configuration generation in ExecutionPlanner between high and low-level API applications c7fde4a03 [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814 44844635b [Yi Pan (Data Infrastructure)] SAMZA-1814: merge with master 8797cdd4f [Yi Pan (Data Infrastructure)] SAMZA-1814: merge with master dae98cebe [Yi Pan (Data Infrastructure)] SAMZA-1814: consolidate the configure generation between high and low-level API applications 3a91b9a62 [Yi Pan (Data Infrastructure)] SAMZA-1814: moving some logic to ApplicationDescriptorImpl to simplify the JobGraph/JobNode code 97c00a2e0 [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP unit tests fixed for configure generation. 05637e6e6 [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP consolidate all JobGraph and JobNode Json and Config generation code to support both high- and low-level applications 9d564642a [Yi Pan (Data Infrastructure)] Merge branch 'master' into SAMZA-1814 16bef1b1b [Yi Pan (Data Infrastructure)] SAMZA-1814: WIP fixing the task application configuration generation in the planner 66af5b706 [Yi Pan (Data Infrastructure)] SAMZA-1789: addressing Cameron's review comments. ec4bb1dca [Yi Pan (Data Infrastructure)] SAMZA-1789: merge with fix for SAMZA-1836 9c89c63dc [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 91fcd73ae [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 34ffda8ae [Yi Pan (Data Infrastructure)] SAMZA-1789: disabling tests due to SAMZA-1836 02076c850 [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed the modifier for the mandatory constructor of ApplicationRunner; Disabled three tests due to wrong configure for test systems 222abf21f [Yi Pan (Data Infrastructure)] SAMZA-1789: added a constructor to StreamProcessor to take a StreamProcessorListenerFactory 7a73992a5 [Yi Pan (Data Infrastructure)] SAMZA-1789: fixing checkstyle and javadoc errors 9997b98bb [Yi Pan (Data Infrastructure)] SAMZA-1789: renamed all ApplicationDescriptor classes with full-spelling of Application f4b3d43a4 [Yi Pan (Data Infrastructure)] SAMZA-1789: Fxing TaskApplication examples and some checkstyle errors f2969f8df [Yi Pan (Data Infrastructure)] SAMZA-1789: fixed ApplicationDescriptor to use InputDescriptor and OutputDescriptor; addressed Prateek's comments. f04404cc2 [Yi Pan (Data Infrastructure)] SAMZA-1789: move createStreams out of the loop in prepareJobs 33753f72d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 12c09af06 [Yi Pan (Data Infrastructure)] SAMZA-1789: Fix a merging error (with SAMZA-1813) a072118d0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks e7af6932d [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 8d4d3ffda [Yi Pan (Data Infrastructure)] Merge with master 055bd91e4 [Yi Pan (Data Infrastructure)] SAMZA-1789: fix unit test with ThreadJobFactory 247dcff4c [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 1621c4d00 [Yi Pan (Data Infrastructure)] SAMZA-1789: a few more fixes to address Cameron's reviews 6e446fe6d [Yi Pan (Data Infrastructure)] SAMZA-1789: address Cameron's review comments. 4382d45db [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks 3b2f04d54 [Yi Pan (Data Infrastructure)] SAMZA-1789: moved all impl classes from samza-api to samza-core. db96da830 [Yi Pan (Data Infrastructure)] SAMZA-1789: WIP - revision to address review feedbacks. 014337170 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks a82708bb0 [Yi Pan (Data Infrastructure)] SAMZA-1789: unify ApplicationDescriptor and ApplicationRunner for high- and low-level APIs in YARN and standalone environment c4bb0dce6 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-runtime-with-processor-callbacks f20cdcda6 [Yi Pan (Data Infrastructure)] WIP: adding unit tests. Pending update on StreamProcessorLifecycleListener, LocalContainerRunner, and SamzaContainerListener 973eb5261 [Yi Pan (Data Infrastructure)] WIP: compiles, still working on LocalContainerRunner refactor fb1bc49e0 [Yi Pan (Data Infrastructure)] Merge branch 'master' into app-spec-with-app-runtime-Jul-16-18 30a4e5f0a [Yi Pan (Data Infrastructure)] WIP: application runner refactor - proto-type for SEP-13 95577b74c [Yi Pan (Data Infrastructure)] WIP: trying to figure out the two interface classes for spec: a) spec builder in init(); b) spec reader in all other lifecycle methods 42782d815 [Yi Pan (Data Infrastructure)] Merge branch 'prateek-remove-app-runner-stream-spec' into app-spec-with-app-runtime-Jul-16-18 d43e92319 [Yi Pan (Data Infrastructure)] WIP: proto-type with ApplicationRunnable and no ApplicationRunner exposed to user f1cb8f0eb [Yi Pan (Data Infrastructure)] Merge branch 'master' into single-app-api-May-21-18 7e71dc7e0 [Yi Pan (Data Infrastructure)] Merge with master 856193013 [Prateek Maheshwari] Merge branch 'master' into stream-spec-cleanup 7d7aa5088 [Prateek Maheshwari] Updated with Cameron and Daniel's feedback. 8e6fc2dac [prateekm] Remove all usages of StreamSpec and ApplicationRunner from the operator spec and impl layers. Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/cfbb9c6e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/cfbb9c6e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/cfbb9c6e Branch: refs/heads/master Commit: cfbb9c6ebabfe1e5e13af50a0487bde3f5c1e925 Parents: b842626 Author: Yi Pan (Data Infrastructure) <[email protected]> Authored: Wed Sep 26 11:00:51 2018 -0700 Committer: Yi Pan (Data Infrastructure) <[email protected]> Committed: Wed Sep 26 11:00:51 2018 -0700 ---------------------------------------------------------------------- build.gradle | 1 + .../application/ApplicationDescriptorImpl.java | 106 +++- .../StreamApplicationDescriptorImpl.java | 54 +- .../TaskApplicationDescriptorImpl.java | 14 + .../samza/execution/ExecutionPlanner.java | 288 ++--------- .../execution/IntermediateStreamManager.java | 297 +++++++++++ .../org/apache/samza/execution/JobGraph.java | 92 ++-- .../samza/execution/JobGraphJsonGenerator.java | 110 ++-- .../org/apache/samza/execution/JobNode.java | 365 +++---------- .../JobNodeConfigurationGenerator.java | 361 +++++++++++++ .../org/apache/samza/execution/JobPlanner.java | 67 +-- .../apache/samza/execution/LocalJobPlanner.java | 21 +- .../execution/OperatorSpecGraphAnalyzer.java | 12 +- .../samza/execution/RemoteJobPlanner.java | 20 +- .../samza/operators/BaseTableDescriptor.java | 9 + .../samza/operators/OperatorSpecGraph.java | 7 - .../samza/runtime/LocalContainerRunner.java | 2 +- .../samza/table/TableConfigGenerator.java | 48 -- .../samza/zk/ZkJobCoordinatorFactory.java | 5 +- .../org/apache/samza/config/JobConfig.scala | 4 +- .../apache/samza/container/SamzaContainer.scala | 2 +- .../MetricsSnapshotReporterFactory.scala | 1 - .../samza/util/CoordinatorStreamUtil.scala | 2 +- .../TestStreamApplicationDescriptorImpl.java | 3 +- .../TestTaskApplicationDescriptorImpl.java | 8 +- .../execution/ExecutionPlannerTestBase.java | 157 ++++++ .../samza/execution/TestExecutionPlanner.java | 181 ++++++- .../TestIntermediateStreamManager.java | 68 +++ .../apache/samza/execution/TestJobGraph.java | 34 +- .../execution/TestJobGraphJsonGenerator.java | 147 +++++- .../org/apache/samza/execution/TestJobNode.java | 228 --------- .../TestJobNodeConfigurationGenerator.java | 509 +++++++++++++++++++ .../samza/execution/TestRemoteJobPlanner.java | 2 +- .../samza/operators/TestOperatorSpecGraph.java | 1 - .../operators/spec/OperatorSpecTestUtils.java | 1 - .../runtime/TestLocalApplicationRunner.java | 9 +- .../runtime/TestRemoteApplicationRunner.java | 2 +- .../samza/system/hdfs/HdfsSystemFactory.scala | 2 +- .../kafka/KafkaCheckpointManagerFactory.scala | 2 +- .../samza/config/KafkaConsumerConfig.java | 13 +- .../scala/org/apache/samza/util/KafkaUtil.scala | 2 +- .../kv/BaseLocalStoreBackedTableProvider.java | 17 +- .../apache/samza/test/framework/TestRunner.java | 58 +-- .../system/InMemorySystemDescriptor.java | 10 +- .../table/TestTableDescriptorsProvider.java | 6 - .../samza/validation/YarnJobValidationTool.java | 2 +- .../org/apache/samza/job/yarn/YarnJob.scala | 4 +- 47 files changed, 2162 insertions(+), 1192 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/build.gradle ---------------------------------------------------------------------- diff --git a/build.gradle b/build.gradle index 48a28f2..a45a875 100644 --- a/build.gradle +++ b/build.gradle @@ -194,6 +194,7 @@ project(":samza-core_$scalaVersion") { testCompile "org.powermock:powermock-core:$powerMockVersion" testCompile "org.powermock:powermock-module-junit4:$powerMockVersion" testCompile "org.scalatest:scalatest_$scalaVersion:$scalaTestVersion" + testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion" } checkstyle { http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java index 9679136..b58d5a5 100644 --- a/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/application/ApplicationDescriptorImpl.java @@ -19,6 +19,7 @@ package org.apache.samza.application; import java.util.Collections; +import java.util.HashMap; import java.util.LinkedHashMap; import java.util.Map; import java.util.Optional; @@ -26,13 +27,20 @@ import java.util.Set; import org.apache.samza.config.Config; import org.apache.samza.metrics.MetricsReporterFactory; import org.apache.samza.operators.ContextManager; +import org.apache.samza.operators.KV; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; import org.apache.samza.operators.descriptors.base.system.SystemDescriptor; +import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.runtime.ProcessorLifecycleListener; import org.apache.samza.runtime.ProcessorLifecycleListenerFactory; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; import org.apache.samza.task.TaskContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** @@ -46,10 +54,15 @@ import org.apache.samza.task.TaskContext; */ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> implements ApplicationDescriptor<S> { + private static final Logger LOGGER = LoggerFactory.getLogger(ApplicationDescriptorImpl.class); - final Config config; private final Class<? extends SamzaApplication> appClass; private final Map<String, MetricsReporterFactory> reporterFactories = new LinkedHashMap<>(); + // serdes used by input/output/intermediate streams, keyed by streamId + private final Map<String, KV<Serde, Serde>> streamSerdes = new HashMap<>(); + // serdes used by tables, keyed by tableId + private final Map<String, KV<Serde, Serde>> tableSerdes = new HashMap<>(); + final Config config; // Default to no-op functions in ContextManager // TODO: this should be replaced by shared context factory defined in SAMZA-1714 @@ -142,6 +155,35 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> } /** + * Get the corresponding {@link KVSerde} for the input {@code inputStreamId} + * + * @param streamId id of the stream + * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist + */ + public KV<Serde, Serde> getStreamSerdes(String streamId) { + return streamSerdes.get(streamId); + } + + /** + * Get the corresponding {@link KVSerde} for the input {@code inputStreamId} + * + * @param tableId id of the table + * @return the {@link KVSerde} for the stream. null if the serde is not defined or {@code streamId} does not exist + */ + public KV<Serde, Serde> getTableSerdes(String tableId) { + return tableSerdes.get(tableId); + } + + /** + * Get the map of all {@link InputOperatorSpec}s in this applicaiton + * + * @return an immutable map from streamId to {@link InputOperatorSpec}. Default to empty map for low-level {@link TaskApplication} + */ + public Map<String, InputOperatorSpec> getInputOperators() { + return Collections.EMPTY_MAP; + } + + /** * Get all the {@link InputDescriptor}s to this application * * @return an immutable map of streamId to {@link InputDescriptor} @@ -176,4 +218,66 @@ public abstract class ApplicationDescriptorImpl<S extends ApplicationDescriptor> */ public abstract Set<SystemDescriptor> getSystemDescriptors(); + /** + * Get all the unique input streamIds in this application + * + * @return an immutable set of input streamIds + */ + public abstract Set<String> getInputStreamIds(); + + /** + * Get all the unique output streamIds in this application + * + * @return an immutable set of output streamIds + */ + public abstract Set<String> getOutputStreamIds(); + + KV<Serde, Serde> getOrCreateStreamSerdes(String streamId, Serde serde) { + Serde keySerde, valueSerde; + + KV<Serde, Serde> currentSerdePair = streamSerdes.get(streamId); + + if (serde instanceof KVSerde) { + keySerde = ((KVSerde) serde).getKeySerde(); + valueSerde = ((KVSerde) serde).getValueSerde(); + } else { + keySerde = new NoOpSerde(); + valueSerde = serde; + } + + if (currentSerdePair == null) { + if (keySerde instanceof NoOpSerde) { + LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId + + ". Keys will not be (de)serialized"); + } + if (valueSerde instanceof NoOpSerde) { + LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId + + ". Values will not be (de)serialized"); + } + streamSerdes.put(streamId, KV.of(keySerde, valueSerde)); + } else if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) { + throw new IllegalArgumentException(String.format("Serde for stream %s is already defined. Cannot change it to " + + "different serdes.", streamId)); + } + return streamSerdes.get(streamId); + } + + KV<Serde, Serde> getOrCreateTableSerdes(String tableId, KVSerde kvSerde) { + Serde keySerde, valueSerde; + keySerde = kvSerde.getKeySerde(); + valueSerde = kvSerde.getValueSerde(); + + if (!tableSerdes.containsKey(tableId)) { + tableSerdes.put(tableId, KV.of(keySerde, valueSerde)); + return tableSerdes.get(tableId); + } + + KV<Serde, Serde> currentSerdePair = tableSerdes.get(tableId); + if (!currentSerdePair.getKey().equals(keySerde) || !currentSerdePair.getValue().equals(valueSerde)) { + throw new IllegalArgumentException(String.format("Serde for table %s is already defined. Cannot change it to " + + "different serdes.", tableId)); + } + return streamSerdes.get(tableId); + } + } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java index d50b0d0..5129913 100644 --- a/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/application/StreamApplicationDescriptorImpl.java @@ -51,7 +51,6 @@ import org.apache.samza.operators.spec.OperatorSpecs; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; @@ -78,7 +77,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S // We use a LHM for deterministic order in initializing and closing operators. private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); - private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>(); + private final Map<String, TableImpl> tables = new LinkedHashMap<>(); private final Set<String> operatorIds = new HashSet<>(); private Optional<SystemDescriptor> defaultSystemDescriptorOptional = Optional.empty(); @@ -125,7 +124,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S "getInputStream must not be called multiple times with the same streamId: " + streamId); Serde serde = inputDescriptor.getSerde(); - KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde); if (outputStreams.containsKey(streamId)) { OutputStreamImpl outputStream = outputStreams.get(streamId); Serde keySerde = outputStream.getKeySerde(); @@ -156,7 +155,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S "getOutputStream must not be called multiple times with the same streamId: " + streamId); Serde serde = outputDescriptor.getSerde(); - KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + KV<Serde, Serde> kvSerdes = getOrCreateStreamSerdes(streamId, serde); if (inputOperators.containsKey(streamId)) { InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId); Serde keySerde = inputOperatorSpec.getKeySerde(); @@ -186,13 +185,15 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId())); tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor); - TableSpec tableSpec = ((BaseTableDescriptor) tableDescriptor).getTableSpec(); - if (tables.containsKey(tableSpec)) { + BaseTableDescriptor baseTableDescriptor = (BaseTableDescriptor) tableDescriptor; + TableSpec tableSpec = baseTableDescriptor.getTableSpec(); + if (tables.containsKey(tableSpec.getId())) { throw new IllegalStateException( String.format("getTable() invoked multiple times with the same tableId: %s", tableId)); } - tables.put(tableSpec, new TableImpl(tableSpec)); - return tables.get(tableSpec); + tables.put(tableSpec.getId(), new TableImpl(tableSpec)); + getOrCreateTableSerdes(tableSpec.getId(), baseTableDescriptor.getSerde()); + return tables.get(tableSpec.getId()); } /** @@ -247,6 +248,16 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values())); } + @Override + public Set<String> getInputStreamIds() { + return Collections.unmodifiableSet(new HashSet<>(inputOperators.keySet())); + } + + @Override + public Set<String> getOutputStreamIds() { + return Collections.unmodifiableSet(new HashSet<>(outputStreams.keySet())); + } + /** * Get the default {@link SystemDescriptor} in this application * @@ -306,7 +317,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S return Collections.unmodifiableMap(outputStreams); } - public Map<TableSpec, TableImpl> getTables() { + public Map<String, TableImpl> getTables() { return Collections.unmodifiableMap(tables); } @@ -342,7 +353,7 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S kvSerdes = new KV<>(null, null); // and that key and msg serdes are provided for job.default.system in configs } else { isKeyed = serde instanceof KVSerde; - kvSerdes = getKVSerdes(streamId, serde); + kvSerdes = getOrCreateStreamSerdes(streamId, serde); } InputTransformer transformer = (InputTransformer) getDefaultSystemDescriptor() @@ -356,29 +367,6 @@ public class StreamApplicationDescriptorImpl extends ApplicationDescriptorImpl<S return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId)); } - private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) { - Serde keySerde, valueSerde; - - if (serde instanceof KVSerde) { - keySerde = ((KVSerde) serde).getKeySerde(); - valueSerde = ((KVSerde) serde).getValueSerde(); - } else { - keySerde = new NoOpSerde(); - valueSerde = serde; - } - - if (keySerde instanceof NoOpSerde) { - LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId + - ". Keys will not be (de)serialized"); - } - if (valueSerde instanceof NoOpSerde) { - LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId + - ". Values will not be (de)serialized"); - } - - return KV.of(keySerde, valueSerde); - } - // check uniqueness of the {@code systemDescriptor} and add if it is unique private void addSystemDescriptor(SystemDescriptor systemDescriptor) { Preconditions.checkState(!systemDescriptors.containsKey(systemDescriptor.getSystemName()) http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java index 3597d7c..d140a90 100644 --- a/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/application/TaskApplicationDescriptorImpl.java @@ -25,6 +25,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Set; import org.apache.samza.config.Config; +import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.operators.TableDescriptor; import org.apache.samza.operators.descriptors.base.stream.InputDescriptor; import org.apache.samza.operators.descriptors.base.stream.OutputDescriptor; @@ -65,6 +66,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas // TODO: SAMZA-1841: need to add to the broadcast streams if inputDescriptor is for a broadcast stream Preconditions.checkState(!inputDescriptors.containsKey(inputDescriptor.getStreamId()), String.format("add input descriptors multiple times with the same streamId: %s", inputDescriptor.getStreamId())); + getOrCreateStreamSerdes(inputDescriptor.getStreamId(), inputDescriptor.getSerde()); inputDescriptors.put(inputDescriptor.getStreamId(), inputDescriptor); addSystemDescriptor(inputDescriptor.getSystemDescriptor()); } @@ -73,6 +75,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas public void addOutputStream(OutputDescriptor outputDescriptor) { Preconditions.checkState(!outputDescriptors.containsKey(outputDescriptor.getStreamId()), String.format("add output descriptors multiple times with the same streamId: %s", outputDescriptor.getStreamId())); + getOrCreateStreamSerdes(outputDescriptor.getStreamId(), outputDescriptor.getSerde()); outputDescriptors.put(outputDescriptor.getStreamId(), outputDescriptor); addSystemDescriptor(outputDescriptor.getSystemDescriptor()); } @@ -81,6 +84,7 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas public void addTable(TableDescriptor tableDescriptor) { Preconditions.checkState(!tableDescriptors.containsKey(tableDescriptor.getTableId()), String.format("add table descriptors multiple times with the same tableId: %s", tableDescriptor.getTableId())); + getOrCreateTableSerdes(tableDescriptor.getTableId(), ((BaseTableDescriptor) tableDescriptor).getSerde()); tableDescriptors.put(tableDescriptor.getTableId(), tableDescriptor); } @@ -111,6 +115,16 @@ public class TaskApplicationDescriptorImpl extends ApplicationDescriptorImpl<Tas return Collections.unmodifiableSet(new HashSet<>(systemDescriptors.values())); } + @Override + public Set<String> getInputStreamIds() { + return Collections.unmodifiableSet(new HashSet<>(inputDescriptors.keySet())); + } + + @Override + public Set<String> getOutputStreamIds() { + return Collections.unmodifiableSet(new HashSet<>(outputDescriptors.keySet())); + } + /** * Get the user-defined {@link TaskFactory} * @return the {@link TaskFactory} object http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index 46aef8d..eea6387 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -22,72 +22,57 @@ package org.apache.samza.execution; import com.google.common.collect.HashMultimap; import com.google.common.collect.Multimap; import com.google.common.collect.Sets; -import java.util.ArrayList; import java.util.Collection; -import java.util.Collections; -import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; -import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.application.LegacyTaskApplication; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; import org.apache.samza.config.StreamConfig; -import org.apache.samza.operators.OperatorSpecGraph; -import org.apache.samza.operators.spec.InputOperatorSpec; -import org.apache.samza.operators.spec.JoinOperatorSpec; +import org.apache.samza.operators.BaseTableDescriptor; import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.samza.execution.ExecutionPlanner.StreamEdgeSet.StreamEdgeSetCategory; import static org.apache.samza.util.StreamUtil.*; /** - * The ExecutionPlanner creates the physical execution graph for the {@link OperatorSpecGraph}, and + * The ExecutionPlanner creates the physical execution graph for the {@link ApplicationDescriptorImpl}, and * the intermediate topics needed for the execution. */ // TODO: ExecutionPlanner needs to be able to generate single node JobGraph for low-level TaskApplication as well (SAMZA-1811) public class ExecutionPlanner { private static final Logger log = LoggerFactory.getLogger(ExecutionPlanner.class); - /* package private */ static final int MAX_INFERRED_PARTITIONS = 256; - private final Config config; - private final StreamConfig streamConfig; private final StreamManager streamManager; public ExecutionPlanner(Config config, StreamManager streamManager) { this.config = config; this.streamManager = streamManager; - this.streamConfig = new StreamConfig(config); } - public ExecutionPlan plan(OperatorSpecGraph opSpecGraph) { + public ExecutionPlan plan(ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { validateConfig(); - // Create physical job graph based on stream graph - JobGraph jobGraph = createJobGraph(opSpecGraph); - - // Fetch the external streams partition info - fetchInputAndOutputStreamPartitions(jobGraph); + // create physical job graph based on stream graph + JobGraph jobGraph = createJobGraph(config, appDesc); - // Verify agreement in partition count between all joined input/intermediate streams - validateJoinInputStreamPartitions(jobGraph); + // fetch the external streams partition info + setInputAndOutputStreamPartitionCount(jobGraph, streamManager); - if (!jobGraph.getIntermediateStreamEdges().isEmpty()) { - // Set partition count of intermediate streams not participating in joins - setIntermediateStreamPartitions(jobGraph); - - // Validate partition counts were assigned for all intermediate streams - validateIntermediateStreamPartitions(jobGraph); - } + // figure out the partitions for internal streams + new IntermediateStreamManager(config, appDesc).calculatePartitions(jobGraph); return jobGraph; } @@ -103,21 +88,23 @@ public class ExecutionPlanner { } /** - * Creates the physical graph from {@link OperatorSpecGraph} + * Create the physical graph from {@link ApplicationDescriptorImpl} */ - /* package private */ JobGraph createJobGraph(OperatorSpecGraph opSpecGraph) { - JobGraph jobGraph = new JobGraph(config, opSpecGraph); - + /* package private */ + JobGraph createJobGraph(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { + JobGraph jobGraph = new JobGraph(config, appDesc); + StreamConfig streamConfig = new StreamConfig(config); // Source streams contain both input and intermediate streams. - Set<StreamSpec> sourceStreams = getStreamSpecs(opSpecGraph.getInputOperators().keySet(), streamConfig); + Set<StreamSpec> sourceStreams = getStreamSpecs(appDesc.getInputStreamIds(), streamConfig); // Sink streams contain both output and intermediate streams. - Set<StreamSpec> sinkStreams = getStreamSpecs(opSpecGraph.getOutputStreams().keySet(), streamConfig); + Set<StreamSpec> sinkStreams = getStreamSpecs(appDesc.getOutputStreamIds(), streamConfig); Set<StreamSpec> intermediateStreams = Sets.intersection(sourceStreams, sinkStreams); Set<StreamSpec> inputStreams = Sets.difference(sourceStreams, intermediateStreams); Set<StreamSpec> outputStreams = Sets.difference(sinkStreams, intermediateStreams); - Set<TableSpec> tables = opSpecGraph.getTables().keySet(); + Set<TableSpec> tables = appDesc.getTableDescriptors().stream() + .map(tableDescriptor -> ((BaseTableDescriptor) tableDescriptor).getTableSpec()).collect(Collectors.toSet()); // For this phase, we have a single job node for the whole dag String jobName = config.get(JobConfig.JOB_NAME()); @@ -136,15 +123,20 @@ public class ExecutionPlanner { // Add tables tables.forEach(spec -> jobGraph.addTable(spec, node)); - jobGraph.validate(); + if (!LegacyTaskApplication.class.isAssignableFrom(appDesc.getAppClass())) { + // skip the validation when input streamIds are empty. This is only possible for LegacyTaskApplication + jobGraph.validate(); + } return jobGraph; } /** - * Fetches the partitions of input/output streams and update the corresponding StreamEdges. + * Fetch the partitions of source/sink streams and update the StreamEdges. + * @param jobGraph {@link JobGraph} + * @param streamManager the {@link StreamManager} to interface with the streams. */ - /* package private */ void fetchInputAndOutputStreamPartitions(JobGraph jobGraph) { + /* package private */ static void setInputAndOutputStreamPartitionCount(JobGraph jobGraph, StreamManager streamManager) { Set<StreamEdge> existingStreams = new HashSet<>(); existingStreams.addAll(jobGraph.getInputStreams()); existingStreams.addAll(jobGraph.getOutputStreams()); @@ -182,224 +174,4 @@ public class ExecutionPlanner { } } - /** - * Validates agreement in partition count between input/intermediate streams participating in join operations. - */ - private void validateJoinInputStreamPartitions(JobGraph jobGraph) { - // Group input operator specs (input/intermediate streams) by the joins they participate in. - Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs = - OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(jobGraph.getSpecGraph()); - - // Convert every group of input operator specs into a group of corresponding stream edges. - List<StreamEdgeSet> streamEdgeSets = new ArrayList<>(); - for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) { - Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec); - StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph); - streamEdgeSets.add(streamEdgeSet); - } - - /* - * Sort the stream edge groups by their category so they appear in this order: - * 1. groups composed exclusively of stream edges with set partition counts - * 2. groups composed of a mix of stream edges with set/unset partition counts - * 3. groups composed exclusively of stream edges with unset partition counts - * - * This guarantees that we process the most constrained stream edge groups first, - * which is crucial for intermediate stream edges that are members of multiple - * stream edge groups. For instance, if we have the following groups of stream - * edges (partition counts in parentheses, question marks for intermediate streams): - * - * a. e1 (16), e2 (16) - * b. e2 (16), e3 (?) - * c. e3 (?), e4 (?) - * - * processing them in the above order (most constrained first) is guaranteed to - * yield correct assignment of partition counts of e3 and e4 in a single scan. - */ - Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> e.getCategory().getSortOrder())); - - // Verify agreement between joined input/intermediate streams. - // This may involve setting partition counts of intermediate stream edges. - streamEdgeSets.forEach(ExecutionPlanner::validateAndAssignStreamEdgeSetPartitions); - } - - /** - * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link StreamEdge}s - * correspond to the provided {@code inputOpSpecs}. - */ - private StreamEdgeSet getStreamEdgeSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs, - JobGraph jobGraph) { - - int countStreamEdgeWithSetPartitions = 0; - Set<StreamEdge> streamEdges = new HashSet<>(); - - for (InputOperatorSpec inputOpSpec : inputOpSpecs) { - StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(inputOpSpec.getStreamId(), streamConfig)); - if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) { - ++countStreamEdgeWithSetPartitions; - } - streamEdges.add(streamEdge); - } - - // Determine category of stream group based on stream partition counts. - StreamEdgeSetCategory category; - if (countStreamEdgeWithSetPartitions == 0) { - category = StreamEdgeSetCategory.NO_PARTITION_COUNT_SET; - } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) { - category = StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET; - } else { - category = StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET; - } - - return new StreamEdgeSet(setId, streamEdges, category); - } - - /** - * Sets partition count of intermediate streams which have not been assigned partition counts. - */ - private void setIntermediateStreamPartitions(JobGraph jobGraph) { - final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(); - int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN); - if (partitions == StreamEdge.PARTITIONS_UNKNOWN) { - // use the following simple algo to figure out the partitions - // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions)) - // partition will be further bounded by MAX_INFERRED_PARTITIONS. - // This is important when running in hadoop where an HDFS input can have lots of files (partitions). - int maxInPartitions = maxPartitions(jobGraph.getInputStreams()); - int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams()); - partitions = Math.max(maxInPartitions, maxOutPartitions); - - if (partitions > MAX_INFERRED_PARTITIONS) { - partitions = MAX_INFERRED_PARTITIONS; - log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.", - partitions, MAX_INFERRED_PARTITIONS)); - } - } else { - // Reject any zero or other negative values explicitly specified in config. - if (partitions <= 0) { - throw new SamzaException(String.format("Invalid value %d specified for config property %s", partitions, - defaultPartitionsConfigProperty)); - } - - log.info("Using partition count value {} specified for config property {}", partitions, - defaultPartitionsConfigProperty); - } - - for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) { - if (edge.getPartitionCount() <= 0) { - log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions); - edge.setPartitionCount(partitions); - } - } - } - - /** - * Ensures all intermediate streams have been assigned partition counts. - */ - private static void validateIntermediateStreamPartitions(JobGraph jobGraph) { - for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) { - if (edge.getPartitionCount() <= 0) { - throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName())); - } - } - } - - /** - * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in partition count. - * This may include setting partition counts of intermediate streams in this set that do not - * have their partition counts set. - */ - private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet streamEdgeSet) { - Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges(); - StreamEdge firstStreamEdgeWithSetPartitions = - streamEdges.stream() - .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) - .findFirst() - .orElse(null); - - // This group consists exclusively of intermediate streams with unknown partition counts. - // We cannot do any validation/computation of partition counts of such streams right here, - // but they are tackled later in the ExecutionPlanner. - if (firstStreamEdgeWithSetPartitions == null) { - return; - } - - // Make sure all other stream edges in this group have the same partition count. - int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount(); - for (StreamEdge streamEdge : streamEdges) { - int streamPartitions = streamEdge.getPartitionCount(); - if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) { - streamEdge.setPartitionCount(partitions); - log.info("Inferred the partition count {} for the join operator {} from {}." - , new Object[] {partitions, streamEdgeSet.getSetId(), firstStreamEdgeWithSetPartitions.getName()}); - } else if (streamPartitions != partitions) { - throw new SamzaException(String.format( - "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d", - streamEdge.getName(), streamEdgeSet.getSetId(), partitions, streamPartitions)); - } - } - } - - /* package private */ static int maxPartitions(Collection<StreamEdge> edges) { - return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN); - } - - /** - * Represents a set of {@link StreamEdge}s. - */ - /* package private */ static class StreamEdgeSet { - - /** - * Indicates whether all stream edges in this group have their partition counts assigned. - */ - public enum StreamEdgeSetCategory { - /** - * All stream edges in this group have their partition counts assigned. - */ - ALL_PARTITION_COUNT_SET(0), - - /** - * Only some stream edges in this group have their partition counts assigned. - */ - SOME_PARTITION_COUNT_SET(1), - - /** - * No stream edge in this group is assigned a partition count. - */ - NO_PARTITION_COUNT_SET(2); - - - private final int sortOrder; - - StreamEdgeSetCategory(int sortOrder) { - this.sortOrder = sortOrder; - } - - public int getSortOrder() { - return sortOrder; - } - } - - private final String setId; - private final Set<StreamEdge> streamEdges; - private final StreamEdgeSetCategory category; - - public StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, StreamEdgeSetCategory category) { - this.setId = setId; - this.streamEdges = streamEdges; - this.category = category; - } - - public Set<StreamEdge> getStreamEdges() { - return streamEdges; - } - - public String getSetId() { - return setId; - } - - public StreamEdgeSetCategory getCategory() { - return category; - } - } } http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java new file mode 100644 index 0000000..66cbe6a --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/execution/IntermediateStreamManager.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.samza.execution; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.Multimap; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.samza.SamzaException; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.operators.spec.JoinOperatorSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * {@link IntermediateStreamManager} calculates intermediate stream partitions based on the high-level application graph. + */ +class IntermediateStreamManager { + + private static final Logger log = LoggerFactory.getLogger(IntermediateStreamManager.class); + + private final Config config; + private final Map<String, InputOperatorSpec> inputOperators; + + @VisibleForTesting + static final int MAX_INFERRED_PARTITIONS = 256; + + IntermediateStreamManager(Config config, ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc) { + this.config = config; + this.inputOperators = appDesc.getInputOperators(); + } + + /** + * Figure out the number of partitions of all streams + */ + /* package private */ void calculatePartitions(JobGraph jobGraph) { + + // Verify agreement in partition count between all joined input/intermediate streams + validateJoinInputStreamPartitions(jobGraph); + + if (!jobGraph.getIntermediateStreamEdges().isEmpty()) { + // Set partition count of intermediate streams not participating in joins + setIntermediateStreamPartitions(jobGraph); + + // Validate partition counts were assigned for all intermediate streams + validateIntermediateStreamPartitions(jobGraph); + } + } + + /** + * Validates agreement in partition count between input/intermediate streams participating in join operations. + */ + private void validateJoinInputStreamPartitions(JobGraph jobGraph) { + // Group input operator specs (input/intermediate streams) by the joins they participate in. + Multimap<JoinOperatorSpec, InputOperatorSpec> joinOpSpecToInputOpSpecs = + OperatorSpecGraphAnalyzer.getJoinToInputOperatorSpecs(inputOperators.values()); + + // Convert every group of input operator specs into a group of corresponding stream edges. + List<StreamEdgeSet> streamEdgeSets = new ArrayList<>(); + for (JoinOperatorSpec joinOpSpec : joinOpSpecToInputOpSpecs.keySet()) { + Collection<InputOperatorSpec> joinedInputOpSpecs = joinOpSpecToInputOpSpecs.get(joinOpSpec); + StreamEdgeSet streamEdgeSet = getStreamEdgeSet(joinOpSpec.getOpId(), joinedInputOpSpecs, jobGraph); + streamEdgeSets.add(streamEdgeSet); + } + + /* + * Sort the stream edge groups by their category so they appear in this order: + * 1. groups composed exclusively of stream edges with set partition counts + * 2. groups composed of a mix of stream edges with set/unset partition counts + * 3. groups composed exclusively of stream edges with unset partition counts + * + * This guarantees that we process the most constrained stream edge groups first, + * which is crucial for intermediate stream edges that are members of multiple + * stream edge groups. For instance, if we have the following groups of stream + * edges (partition counts in parentheses, question marks for intermediate streams): + * + * a. e1 (16), e2 (16) + * b. e2 (16), e3 (?) + * c. e3 (?), e4 (?) + * + * processing them in the above order (most constrained first) is guaranteed to + * yield correct assignment of partition counts of e3 and e4 in a single scan. + */ + Collections.sort(streamEdgeSets, Comparator.comparingInt(e -> e.getCategory().getSortOrder())); + + // Verify agreement between joined input/intermediate streams. + // This may involve setting partition counts of intermediate stream edges. + streamEdgeSets.forEach(IntermediateStreamManager::validateAndAssignStreamEdgeSetPartitions); + } + + /** + * Creates a {@link StreamEdgeSet} whose Id is {@code setId}, and {@link StreamEdge}s + * correspond to the provided {@code inputOpSpecs}. + */ + private StreamEdgeSet getStreamEdgeSet(String setId, Iterable<InputOperatorSpec> inputOpSpecs, + JobGraph jobGraph) { + + int countStreamEdgeWithSetPartitions = 0; + Set<StreamEdge> streamEdges = new HashSet<>(); + + for (InputOperatorSpec inputOpSpec : inputOpSpecs) { + StreamEdge streamEdge = jobGraph.getStreamEdge(inputOpSpec.getStreamId()); + if (streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) { + ++countStreamEdgeWithSetPartitions; + } + streamEdges.add(streamEdge); + } + + // Determine category of stream group based on stream partition counts. + StreamEdgeSet.StreamEdgeSetCategory category; + if (countStreamEdgeWithSetPartitions == 0) { + category = StreamEdgeSet.StreamEdgeSetCategory.NO_PARTITION_COUNT_SET; + } else if (countStreamEdgeWithSetPartitions == streamEdges.size()) { + category = StreamEdgeSet.StreamEdgeSetCategory.ALL_PARTITION_COUNT_SET; + } else { + category = StreamEdgeSet.StreamEdgeSetCategory.SOME_PARTITION_COUNT_SET; + } + + return new StreamEdgeSet(setId, streamEdges, category); + } + + /** + * Sets partition count of intermediate streams which have not been assigned partition counts. + */ + private void setIntermediateStreamPartitions(JobGraph jobGraph) { + final String defaultPartitionsConfigProperty = JobConfig.JOB_INTERMEDIATE_STREAM_PARTITIONS(); + int partitions = config.getInt(defaultPartitionsConfigProperty, StreamEdge.PARTITIONS_UNKNOWN); + if (partitions == StreamEdge.PARTITIONS_UNKNOWN) { + // use the following simple algo to figure out the partitions + // partition = MAX(MAX(Input topic partitions), MAX(Output topic partitions)) + // partition will be further bounded by MAX_INFERRED_PARTITIONS. + // This is important when running in hadoop where an HDFS input can have lots of files (partitions). + int maxInPartitions = maxPartitions(jobGraph.getInputStreams()); + int maxOutPartitions = maxPartitions(jobGraph.getOutputStreams()); + partitions = Math.max(maxInPartitions, maxOutPartitions); + + if (partitions > MAX_INFERRED_PARTITIONS) { + partitions = MAX_INFERRED_PARTITIONS; + log.warn(String.format("Inferred intermediate stream partition count %d is greater than the max %d. Using the max.", + partitions, MAX_INFERRED_PARTITIONS)); + } + } else { + // Reject any zero or other negative values explicitly specified in config. + if (partitions <= 0) { + throw new SamzaException(String.format("Invalid value %d specified for config property %s", partitions, + defaultPartitionsConfigProperty)); + } + + log.info("Using partition count value {} specified for config property {}", partitions, + defaultPartitionsConfigProperty); + } + + for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) { + if (edge.getPartitionCount() <= 0) { + log.info("Set the partition count for intermediate stream {} to {}.", edge.getName(), partitions); + edge.setPartitionCount(partitions); + } + } + } + + /** + * Ensures all intermediate streams have been assigned partition counts. + */ + private static void validateIntermediateStreamPartitions(JobGraph jobGraph) { + for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) { + if (edge.getPartitionCount() <= 0) { + throw new SamzaException(String.format("Failed to assign valid partition count to Stream %s", edge.getName())); + } + } + } + + /** + * Ensures that all streams in the supplied {@link StreamEdgeSet} agree in partition count. + * This may include setting partition counts of intermediate streams in this set that do not + * have their partition counts set. + */ + private static void validateAndAssignStreamEdgeSetPartitions(StreamEdgeSet streamEdgeSet) { + Set<StreamEdge> streamEdges = streamEdgeSet.getStreamEdges(); + StreamEdge firstStreamEdgeWithSetPartitions = + streamEdges.stream() + .filter(streamEdge -> streamEdge.getPartitionCount() != StreamEdge.PARTITIONS_UNKNOWN) + .findFirst() + .orElse(null); + + // This group consists exclusively of intermediate streams with unknown partition counts. + // We cannot do any validation/computation of partition counts of such streams right here, + // but they are tackled later in the ExecutionPlanner. + if (firstStreamEdgeWithSetPartitions == null) { + return; + } + + // Make sure all other stream edges in this group have the same partition count. + int partitions = firstStreamEdgeWithSetPartitions.getPartitionCount(); + for (StreamEdge streamEdge : streamEdges) { + int streamPartitions = streamEdge.getPartitionCount(); + if (streamPartitions == StreamEdge.PARTITIONS_UNKNOWN) { + streamEdge.setPartitionCount(partitions); + log.info("Inferred the partition count {} for the join operator {} from {}.", + new Object[] {partitions, streamEdgeSet.getSetId(), firstStreamEdgeWithSetPartitions.getName()}); + } else if (streamPartitions != partitions) { + throw new SamzaException(String.format( + "Unable to resolve input partitions of stream %s for the join %s. Expected: %d, Actual: %d", + streamEdge.getName(), streamEdgeSet.getSetId(), partitions, streamPartitions)); + } + } + } + + /* package private */ static int maxPartitions(Collection<StreamEdge> edges) { + return edges.stream().mapToInt(StreamEdge::getPartitionCount).max().orElse(StreamEdge.PARTITIONS_UNKNOWN); + } + + /** + * Represents a set of {@link StreamEdge}s. + */ + /* package private */ static class StreamEdgeSet { + + /** + * Indicates whether all stream edges in this group have their partition counts assigned. + */ + public enum StreamEdgeSetCategory { + /** + * All stream edges in this group have their partition counts assigned. + */ + ALL_PARTITION_COUNT_SET(0), + + /** + * Only some stream edges in this group have their partition counts assigned. + */ + SOME_PARTITION_COUNT_SET(1), + + /** + * No stream edge in this group is assigned a partition count. + */ + NO_PARTITION_COUNT_SET(2); + + + private final int sortOrder; + + StreamEdgeSetCategory(int sortOrder) { + this.sortOrder = sortOrder; + } + + public int getSortOrder() { + return sortOrder; + } + } + + private final String setId; + private final Set<StreamEdge> streamEdges; + private final StreamEdgeSetCategory category; + + StreamEdgeSet(String setId, Set<StreamEdge> streamEdges, StreamEdgeSetCategory category) { + this.setId = setId; + this.streamEdges = streamEdges; + this.category = category; + } + + Set<StreamEdge> getStreamEdges() { + return streamEdges; + } + + String getSetId() { + return setId; + } + + StreamEdgeSetCategory getCategory() { + return category; + } + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java index 5b19095..d975188 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java @@ -31,10 +31,11 @@ import java.util.Queue; import java.util.Set; import java.util.stream.Collectors; +import org.apache.samza.application.ApplicationDescriptor; +import org.apache.samza.application.ApplicationDescriptorImpl; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; import org.slf4j.Logger; @@ -59,16 +60,21 @@ import org.slf4j.LoggerFactory; private final Set<StreamEdge> intermediateStreams = new HashSet<>(); private final Set<TableSpec> tables = new HashSet<>(); private final Config config; - private final JobGraphJsonGenerator jsonGenerator = new JobGraphJsonGenerator(); - private final OperatorSpecGraph specGraph; + private final JobGraphJsonGenerator jsonGenerator; + private final JobNodeConfigurationGenerator configGenerator; + private final ApplicationDescriptorImpl<? extends ApplicationDescriptor> appDesc; /** * The JobGraph is only constructed by the {@link ExecutionPlanner}. - * @param config Config + * + * @param config configuration for the application + * @param appDesc {@link ApplicationDescriptorImpl} describing the application */ - JobGraph(Config config, OperatorSpecGraph specGraph) { + JobGraph(Config config, ApplicationDescriptorImpl appDesc) { this.config = config; - this.specGraph = specGraph; + this.appDesc = appDesc; + this.jsonGenerator = new JobGraphJsonGenerator(); + this.configGenerator = new JobNodeConfigurationGenerator(); } @Override @@ -91,11 +97,6 @@ import org.slf4j.LoggerFactory; .collect(Collectors.toList()); } - void addTable(TableSpec tableSpec, JobNode node) { - tables.add(tableSpec); - node.addTable(tableSpec); - } - @Override public String getPlanAsJson() throws Exception { return jsonGenerator.toJson(this); @@ -105,14 +106,11 @@ import org.slf4j.LoggerFactory; * Returns the config for this application * @return {@link ApplicationConfig} */ + @Override public ApplicationConfig getApplicationConfig() { return new ApplicationConfig(config); } - public OperatorSpecGraph getSpecGraph() { - return specGraph; - } - /** * Add a source stream to a {@link JobNode} * @param streamSpec input stream @@ -152,20 +150,20 @@ import org.slf4j.LoggerFactory; intermediateStreams.add(edge); } + void addTable(TableSpec tableSpec, JobNode node) { + tables.add(tableSpec); + node.addTable(tableSpec); + } + /** * Get the {@link JobNode}. Create one if it does not exist. * @param jobName name of the job * @param jobId id of the job - * @return + * @return {@link JobNode} created with {@code jobName} and {@code jobId} */ JobNode getOrCreateJobNode(String jobName, String jobId) { - String nodeId = JobNode.createId(jobName, jobId); - JobNode node = nodes.get(nodeId); - if (node == null) { - node = new JobNode(jobName, jobId, specGraph, config); - nodes.put(nodeId, node); - } - return node; + String nodeId = JobNode.createJobNameAndId(jobName, jobId); + return nodes.computeIfAbsent(nodeId, k -> new JobNode(jobName, jobId, config, appDesc, configGenerator)); } /** @@ -178,20 +176,13 @@ import org.slf4j.LoggerFactory; } /** - * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist. - * @param streamSpec spec of the StreamEdge - * @param isIntermediate boolean flag indicating whether it's an intermediate stream + * Get the {@link StreamEdge} for {@code streamId}. + * + * @param streamId the streamId for the {@link StreamEdge} * @return stream edge */ - StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean isIntermediate) { - String streamId = streamSpec.getId(); - StreamEdge edge = edges.get(streamId); - if (edge == null) { - boolean isBroadcast = specGraph.getBroadcastStreams().contains(streamId); - edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config); - edges.put(streamId, edge); - } - return edge; + StreamEdge getStreamEdge(String streamId) { + return edges.get(streamId); } /** @@ -248,6 +239,23 @@ import org.slf4j.LoggerFactory; } /** + * Get the {@link StreamEdge} for a {@link StreamSpec}. Create one if it does not exist. + * @param streamSpec spec of the StreamEdge + * @param isIntermediate boolean flag indicating whether it's an intermediate stream + * @return stream edge + */ + private StreamEdge getOrCreateStreamEdge(StreamSpec streamSpec, boolean isIntermediate) { + String streamId = streamSpec.getId(); + StreamEdge edge = edges.get(streamId); + if (edge == null) { + boolean isBroadcast = appDesc.getBroadcastStreams().contains(streamId); + edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config); + edges.put(streamId, edge); + } + return edge; + } + + /** * Validate the input streams should have indegree being 0 and outdegree greater than 0 */ private void validateInputStreams() { @@ -305,7 +313,7 @@ import org.slf4j.LoggerFactory; Set<JobNode> unreachable = new HashSet<>(nodes.values()); unreachable.removeAll(reachable); throw new IllegalArgumentException(String.format("Jobs %s cannot be reached from Sources.", - String.join(", ", unreachable.stream().map(JobNode::getId).collect(Collectors.toList())))); + String.join(", ", unreachable.stream().map(JobNode::getJobNameAndId).collect(Collectors.toList())))); } } @@ -325,7 +333,7 @@ import org.slf4j.LoggerFactory; while (!queue.isEmpty()) { JobNode node = queue.poll(); - node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> { + node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(target -> { if (!visited.contains(target)) { visited.add(target); queue.offer(target); @@ -351,9 +359,9 @@ import org.slf4j.LoggerFactory; Map<String, Long> indegree = new HashMap<>(); Set<JobNode> visited = new HashSet<>(); pnodes.forEach(node -> { - String nid = node.getId(); + String nid = node.getJobNameAndId(); //only count the degrees of intermediate streams - long degree = node.getInEdges().stream().filter(e -> !inputStreams.contains(e)).count(); + long degree = node.getInEdges().values().stream().filter(e -> !inputStreams.contains(e)).count(); indegree.put(nid, degree); if (degree == 0L) { @@ -378,8 +386,8 @@ import org.slf4j.LoggerFactory; while (!q.isEmpty()) { JobNode node = q.poll(); sortedNodes.add(node); - node.getOutEdges().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> { - String nid = n.getId(); + node.getOutEdges().values().stream().flatMap(edge -> edge.getTargetNodes().stream()).forEach(n -> { + String nid = n.getJobNameAndId(); Long degree = indegree.get(nid) - 1; indegree.put(nid, degree); if (degree == 0L && !visited.contains(n)) { @@ -400,7 +408,7 @@ import org.slf4j.LoggerFactory; long min = Long.MAX_VALUE; JobNode minNode = null; for (JobNode node : reachable) { - Long degree = indegree.get(node.getId()); + Long degree = indegree.get(node.getJobNameAndId()); if (degree < min) { min = degree; minNode = node; http://git-wip-us.apache.org/repos/asf/samza/blob/cfbb9c6e/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java index 91453d2..18705e4 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java @@ -32,7 +32,6 @@ import java.util.stream.Collectors; import org.apache.samza.config.ApplicationConfig; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.spec.PartitionByOperatorSpec; @@ -140,7 +139,7 @@ import org.codehaus.jackson.map.ObjectMapper; jobGraph.getTables().forEach(t -> buildTableJson(t, jobGraphJson.tables)); jobGraphJson.jobs = jobGraph.getJobNodes().stream() - .map(jobNode -> buildJobNodeJson(jobNode)) + .map(this::buildJobNodeJson) .collect(Collectors.toList()); ByteArrayOutputStream out = new ByteArrayOutputStream(); @@ -149,54 +148,12 @@ import org.codehaus.jackson.map.ObjectMapper; return new String(out.toByteArray()); } - /** - * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.operators.StreamGraph} for this job - * @param jobNode job node in the {@link JobGraph} - * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson} - */ - private JobNodeJson buildJobNodeJson(JobNode jobNode) { - JobNodeJson job = new JobNodeJson(); - job.jobName = jobNode.getJobName(); - job.jobId = jobNode.getJobId(); - job.operatorGraph = buildOperatorGraphJson(jobNode); - return job; - } - - /** - * Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO. - * @param jobNode job node in the {@link JobGraph} - * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson} - */ - private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) { - OperatorGraphJson opGraph = new OperatorGraphJson(); - opGraph.inputStreams = new ArrayList<>(); - jobNode.getSpecGraph().getInputOperators().forEach((streamId, operatorSpec) -> { - StreamJson inputJson = new StreamJson(); - opGraph.inputStreams.add(inputJson); - inputJson.streamId = streamId; - inputJson.nextOperatorIds = operatorSpec.getRegisteredOperatorSpecs().stream() - .map(OperatorSpec::getOpId).collect(Collectors.toSet()); - - updateOperatorGraphJson(operatorSpec, opGraph); - }); - - opGraph.outputStreams = new ArrayList<>(); - jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamId -> { - StreamJson outputJson = new StreamJson(); - outputJson.streamId = streamId; - opGraph.outputStreams.add(outputJson); - }); - return opGraph; - } - - /** - * Traverse the {@link OperatorSpec} graph recursively and update the operator graph JSON POJO. - * @param operatorSpec input - * @param opGraph operator graph to build - */ private void updateOperatorGraphJson(OperatorSpec operatorSpec, OperatorGraphJson opGraph) { - // TODO xiliu: render input operators instead of input streams - if (operatorSpec.getOpCode() != OpCode.INPUT) { + if (operatorSpec == null) { + // task application may not have any defined OperatorSpec + return; + } + if (operatorSpec.getOpCode() != OperatorSpec.OpCode.INPUT) { opGraph.operators.put(operatorSpec.getOpId(), operatorToMap(operatorSpec)); } Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs(); @@ -243,6 +200,46 @@ import org.codehaus.jackson.map.ObjectMapper; } /** + * Create JSON POJO for a {@link JobNode}, including the {@link org.apache.samza.application.ApplicationDescriptorImpl} + * for this job + * + * @param jobNode job node in the {@link JobGraph} + * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.JobNodeJson} + */ + private JobNodeJson buildJobNodeJson(JobNode jobNode) { + JobNodeJson job = new JobNodeJson(); + job.jobName = jobNode.getJobName(); + job.jobId = jobNode.getJobId(); + job.operatorGraph = buildOperatorGraphJson(jobNode); + return job; + } + + /** + * Traverse the {@link OperatorSpec} graph and build the operator graph JSON POJO. + * @param jobNode job node in the {@link JobGraph} + * @return {@link org.apache.samza.execution.JobGraphJsonGenerator.OperatorGraphJson} + */ + private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) { + OperatorGraphJson opGraph = new OperatorGraphJson(); + opGraph.inputStreams = new ArrayList<>(); + jobNode.getInEdges().values().forEach(inStream -> { + StreamJson inputJson = new StreamJson(); + opGraph.inputStreams.add(inputJson); + inputJson.streamId = inStream.getStreamSpec().getId(); + inputJson.nextOperatorIds = jobNode.getNextOperatorIds(inputJson.streamId); + updateOperatorGraphJson(jobNode.getInputOperator(inputJson.streamId), opGraph); + }); + + opGraph.outputStreams = new ArrayList<>(); + jobNode.getOutEdges().values().forEach(outStream -> { + StreamJson outputJson = new StreamJson(); + outputJson.streamId = outStream.getStreamSpec().getId(); + opGraph.outputStreams.add(outputJson); + }); + return opGraph; + } + + /** * Get or create the JSON POJO for a {@link StreamEdge} * @param edge {@link StreamEdge} * @param streamEdges map of streamId to {@link org.apache.samza.execution.JobGraphJsonGenerator.StreamEdgeJson} @@ -261,15 +258,11 @@ import org.codehaus.jackson.map.ObjectMapper; edgeJson.streamSpec = streamSpecJson; List<String> sourceJobs = new ArrayList<>(); - edge.getSourceNodes().forEach(jobNode -> { - sourceJobs.add(jobNode.getJobName()); - }); + edge.getSourceNodes().forEach(jobNode -> sourceJobs.add(jobNode.getJobName())); edgeJson.sourceJobs = sourceJobs; List<String> targetJobs = new ArrayList<>(); - edge.getTargetNodes().forEach(jobNode -> { - targetJobs.add(jobNode.getJobName()); - }); + edge.getTargetNodes().forEach(jobNode -> targetJobs.add(jobNode.getJobName())); edgeJson.targetJobs = targetJobs; streamEdges.put(streamId, edgeJson); @@ -285,12 +278,7 @@ import org.codehaus.jackson.map.ObjectMapper; */ private TableSpecJson buildTableJson(TableSpec tableSpec, Map<String, TableSpecJson> tableSpecs) { String tableId = tableSpec.getId(); - TableSpecJson tableSpecJson = tableSpecs.get(tableId); - if (tableSpecJson == null) { - tableSpecJson = buildTableJson(tableSpec); - tableSpecs.put(tableId, tableSpecJson); - } - return tableSpecJson; + return tableSpecs.computeIfAbsent(tableId, k -> buildTableJson(tableSpec)); } /**
