SAMZA-1745: Remove all usages of StreamSpec and ApplicationRunner from the operator spec and impl layers.
This PR is a pre-requisite for adding support for user-provided SystemDescriptors and StreamDescriptors to the High Level API. It removes all usages of StreamSpec and ApplicationRunner from the OperatorSpec and OperatorImpl layers. DAG specification (StreamGraphSpec, OperatorSpecs) now only relies on logical streamIds (and in future, will use the user-provided StreamDescriptors). DAG execution (i.e., StreamOperatorTask, OperatorImpls) now only relies on logical streamIds and their corresponding SystemStreams, which are obtained using StreamConfig in OperatorImplGraph. After this change, StreamSpec can be thought of as the API between StreamManager and SystemAdmins for creating and validating streams. Ideally ExecutionPlanner shouldn't rely on StreamSpec either, but it currently does so extensively, so I'll leave that refactor for later. Additional changes: 1. ApplicationRunner is no longer responsible for creating/returning StreamSpec instances. Instances can be created directly using the StreamSpec constructors, or by using one of the util methods in the new StreamUtil class. 2. StreamSpec class no longer tracks the isBroadcast and isBounded status for streams. The former was being used for communicating broadcast status from the StreamGraphSpec to the planner so that it could write the broadcast input configurations. This is now done using a separate Set of broadcast streamIds in StreamGraphSpec. The latter was being set by the ApplicationRunner based on a config, and then passed to the planner so that it could write the bounded input configs. This was redundant, so I removed it. Author: Prateek Maheshwari <[email protected]> Author: Prateek Maheshwari <[email protected]> Author: Prateek Maheshwari <[email protected]> Author: prateekm <[email protected]> Reviewers: Jagadish Venkatraman <[email protected]>, Yi Pan <[email protected]>, Cameron Lee <[email protected]> Closes #552 from prateekm/stream-spec-cleanup Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/440a25c9 Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/440a25c9 Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/440a25c9 Branch: refs/heads/master Commit: 440a25c97e7c0e7e7339c8cb08813ef08f4cdf19 Parents: 43c36e6 Author: Prateek Maheshwari <[email protected]> Authored: Fri Jul 27 11:24:00 2018 -0700 Committer: Prateek Maheshwari <[email protected]> Committed: Fri Jul 27 11:24:00 2018 -0700 ---------------------------------------------------------------------- .../samza/operators/functions/MapFunction.java | 2 +- .../apache/samza/runtime/ApplicationRunner.java | 22 -- .../org/apache/samza/system/StreamSpec.java | 56 +-- .../org/apache/samza/config/TaskConfigJava.java | 3 +- .../samza/execution/ExecutionPlanner.java | 22 +- .../org/apache/samza/execution/JobGraph.java | 18 +- .../samza/execution/JobGraphJsonGenerator.java | 12 +- .../org/apache/samza/execution/JobNode.java | 18 +- .../org/apache/samza/execution/StreamEdge.java | 34 +- .../apache/samza/execution/StreamManager.java | 3 +- .../samza/operators/OperatorSpecGraph.java | 15 +- .../apache/samza/operators/StreamGraphSpec.java | 79 ++-- .../operators/impl/BroadcastOperatorImpl.java | 4 +- .../samza/operators/impl/OperatorImplGraph.java | 62 +-- .../operators/impl/OutputOperatorImpl.java | 4 +- .../operators/impl/PartitionByOperatorImpl.java | 11 +- .../samza/operators/spec/InputOperatorSpec.java | 12 +- .../samza/operators/spec/OperatorSpecs.java | 7 +- .../samza/operators/spec/OutputStreamImpl.java | 18 +- .../stream/IntermediateMessageStreamImpl.java | 7 +- .../runtime/AbstractApplicationRunner.java | 64 +-- .../samza/storage/ChangelogStreamManager.java | 4 +- .../apache/samza/storage/StorageRecovery.java | 3 +- .../java/org/apache/samza/util/StreamUtil.java | 90 +++++ .../org/apache/samza/config/StorageConfig.scala | 5 +- .../org/apache/samza/config/TaskConfig.scala | 4 +- .../apache/samza/container/SamzaContainer.scala | 4 +- .../samza/job/local/ThreadJobFactory.scala | 4 +- .../MetricsSnapshotReporterFactory.scala | 5 +- .../main/scala/org/apache/samza/util/Util.scala | 42 -- .../samza/execution/TestExecutionPlanner.java | 42 +- .../apache/samza/execution/TestJobGraph.java | 28 +- .../execution/TestJobGraphJsonGenerator.java | 45 +-- .../org/apache/samza/execution/TestJobNode.java | 17 +- .../apache/samza/execution/TestStreamEdge.java | 16 +- .../samza/operators/TestJoinOperator.java | 11 +- .../samza/operators/TestOperatorSpecGraph.java | 23 +- .../samza/operators/TestStreamGraphSpec.java | 336 ++++++---------- .../operators/impl/TestOperatorImplGraph.java | 272 +++++++------ .../operators/impl/TestWindowOperator.java | 16 +- .../operators/spec/OperatorSpecTestUtils.java | 14 +- .../samza/operators/spec/TestOperatorSpec.java | 18 +- .../spec/TestPartitionByOperatorSpec.java | 14 +- .../runtime/TestAbstractApplicationRunner.java | 391 ------------------- .../apache/samza/task/TestTaskFactoryUtil.java | 5 +- .../apache/samza/testUtils/StreamTestUtils.java | 39 ++ .../org/apache/samza/util/TestStreamUtil.java | 337 ++++++++++++++++ .../samza/system/kafka/KafkaStreamSpec.java | 15 +- .../org/apache/samza/config/KafkaConfig.scala | 4 +- .../samza/config/RegExTopicGenerator.scala | 8 +- .../samza/system/kafka/KafkaSystemAdmin.scala | 4 +- .../samza/system/kafka/TestKafkaStreamSpec.java | 6 +- .../kafka/TestKafkaCheckpointManager.scala | 4 +- .../sql/translator/TestQueryTranslator.java | 235 ++++++----- .../test/integration/NegateNumberTask.java | 4 +- .../test/performance/TestPerformanceTask.scala | 4 +- .../test/operator/RepartitionJoinWindowApp.java | 13 +- .../operator/TestRepartitionJoinWindowApp.java | 7 +- 58 files changed, 1214 insertions(+), 1348 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java index fad9cf8..e2f5d0d 100644 --- a/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java +++ b/samza-api/src/main/java/org/apache/samza/operators/functions/MapFunction.java @@ -30,7 +30,7 @@ import org.apache.samza.annotation.InterfaceStability; */ @InterfaceStability.Unstable @FunctionalInterface -public interface MapFunction<M, OM> extends InitableFunction, ClosableFunction, Serializable { +public interface MapFunction<M, OM> extends InitableFunction, ClosableFunction, Serializable { /** * Transforms the provided message into another message. http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java index 8339429..45abb5d 100644 --- a/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java +++ b/samza-api/src/main/java/org/apache/samza/runtime/ApplicationRunner.java @@ -24,7 +24,6 @@ import org.apache.samza.application.StreamApplication; import org.apache.samza.config.Config; import org.apache.samza.config.ConfigException; import org.apache.samza.job.ApplicationStatus; -import org.apache.samza.system.StreamSpec; import java.lang.reflect.Constructor; @@ -124,25 +123,4 @@ public abstract class ApplicationRunner { public boolean waitForFinish(Duration timeout) { throw new UnsupportedOperationException(getClass().getName() + " does not support timed waitForFinish."); } - - /** - * Constructs a {@link StreamSpec} from the configuration for the specified streamId. - * - * The stream configurations are read from the following properties in the config: - * {@code streams.{$streamId}.*} - * <br> - * All properties matching this pattern are assumed to be system-specific with two exceptions. The following two - * properties are Samza properties which are used to bind the stream to a system and a physical resource on that system. - * - * <ul> - * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined - * the stream will be associated with the System defined in {@code job.default.system}</li> - * <li>samza.physical.name - The system-specific name for this stream. It could be a file URN, topic name, or other identifer. - * If this property isn't defined the physical.name will be set to the streamId</li> - * </ul> - * - * @param streamId The logical identifier for the stream in Samza. - * @return The {@link StreamSpec} instance. - */ - public abstract StreamSpec getStreamSpec(String streamId); } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java index cd86426..aa71f0e 100644 --- a/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java +++ b/samza-api/src/main/java/org/apache/samza/system/StreamSpec.java @@ -76,24 +76,10 @@ public class StreamSpec implements Serializable { private final int partitionCount; /** - * Bounded or unbounded stream - */ - private final boolean isBounded; - - /** - * broadcast stream to all tasks - */ - private final boolean isBroadcast; - - /** * A set of all system-specific configurations for the stream. */ private final Map<String, String> config; - @Override - public String toString() { - return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount); - } /** * @param id The application-unique logical identifier for the stream. It is used to distinguish between * streams in a Samza application so it must be unique in the context of one deployable unit. @@ -107,7 +93,7 @@ public class StreamSpec implements Serializable { * Samza System abstraction. See {@link SystemFactory} */ public StreamSpec(String id, String physicalName, String systemName) { - this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, false, false, Collections.emptyMap()); + this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, Collections.emptyMap()); } /** @@ -126,7 +112,7 @@ public class StreamSpec implements Serializable { * @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned. */ public StreamSpec(String id, String physicalName, String systemName, int partitionCount) { - this(id, physicalName, systemName, partitionCount, false, false, Collections.emptyMap()); + this(id, physicalName, systemName, partitionCount, Collections.emptyMap()); } /** @@ -141,12 +127,10 @@ public class StreamSpec implements Serializable { * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the * Samza System abstraction. See {@link SystemFactory} * - * @param isBounded The stream is bounded or not. - * * @param config A map of properties for the stream. These may be System-specfic. */ - public StreamSpec(String id, String physicalName, String systemName, boolean isBounded, Map<String, String> config) { - this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, isBounded, false, config); + public StreamSpec(String id, String physicalName, String systemName, Map<String, String> config) { + this(id, physicalName, systemName, DEFAULT_PARTITION_COUNT, config); } /** @@ -161,16 +145,11 @@ public class StreamSpec implements Serializable { * @param systemName The System name on which this stream will exist. Corresponds to a named implementation of the * Samza System abstraction. See {@link SystemFactory} * - * @param partitionCount The number of partitionts for the stream. A value of {@code 1} indicates unpartitioned. - * - * @param isBounded The stream is bounded or not. - * - * @param isBroadcast This stream is broadcast or not. + * @param partitionCount The number of partitions for the stream. A value of {@code 1} indicates unpartitioned. * * @param config A map of properties for the stream. These may be System-specfic. */ - public StreamSpec(String id, String physicalName, String systemName, int partitionCount, - boolean isBounded, boolean isBroadcast, Map<String, String> config) { + public StreamSpec(String id, String physicalName, String systemName, int partitionCount, Map<String, String> config) { validateLogicalIdentifier("streamId", id); validateLogicalIdentifier("systemName", systemName); @@ -183,8 +162,6 @@ public class StreamSpec implements Serializable { this.systemName = systemName; this.physicalName = physicalName; this.partitionCount = partitionCount; - this.isBounded = isBounded; - this.isBroadcast = isBroadcast; if (config != null) { this.config = Collections.unmodifiableMap(new HashMap<>(config)); @@ -202,15 +179,11 @@ public class StreamSpec implements Serializable { * @return A copy of this StreamSpec with the specified partitionCount. */ public StreamSpec copyWithPartitionCount(int partitionCount) { - return new StreamSpec(id, physicalName, systemName, partitionCount, this.isBounded, this.isBroadcast, config); + return new StreamSpec(id, physicalName, systemName, partitionCount, config); } public StreamSpec copyWithPhysicalName(String physicalName) { - return new StreamSpec(id, physicalName, systemName, partitionCount, this.isBounded, this.isBroadcast, config); - } - - public StreamSpec copyWithBroadCast() { - return new StreamSpec(id, physicalName, systemName, partitionCount, this.isBounded, true, config); + return new StreamSpec(id, physicalName, systemName, partitionCount, config); } public String getId() { @@ -253,14 +226,6 @@ public class StreamSpec implements Serializable { return id.equals(COORDINATOR_STREAM_ID); } - public boolean isBounded() { - return isBounded; - } - - public boolean isBroadcast() { - return isBroadcast; - } - private void validateLogicalIdentifier(String identifierName, String identifierValue) { if (identifierValue == null || !identifierValue.matches("[A-Za-z0-9_-]+")) { throw new IllegalArgumentException(String.format("Identifier '%s' is '%s'. It must match the expression [A-Za-z0-9_-]+", identifierName, identifierValue)); @@ -297,4 +262,9 @@ public class StreamSpec implements Serializable { public static StreamSpec createStreamAppenderStreamSpec(String physicalName, String systemName, int partitionCount) { return new StreamSpec(STREAM_APPENDER_ID, physicalName, systemName, partitionCount); } + + @Override + public String toString() { + return String.format("StreamSpec: id=%s, systemName=%s, pName=%s, partCount=%d.", id, systemName, physicalName, partitionCount); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java index 29dd3ef..c5b2183 100644 --- a/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java +++ b/samza-core/src/main/java/org/apache/samza/config/TaskConfigJava.java @@ -32,6 +32,7 @@ import org.apache.samza.checkpoint.CheckpointManagerFactory; import org.apache.samza.metrics.MetricsRegistry; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamPartition; +import org.apache.samza.util.StreamUtil; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -102,7 +103,7 @@ public class TaskConfigJava extends MapConfig { } else { String systemStreamName = systemStreamPartition.substring(0, hashPosition); String partitionSegment = systemStreamPartition.substring(hashPosition + 1); - SystemStream systemStream = Util.getSystemStreamFromNames(systemStreamName); + SystemStream systemStream = StreamUtil.getSystemStreamFromNames(systemStreamName); if (Pattern.matches(BROADCAST_STREAM_PATTERN, partitionSegment)) { systemStreamPartitionSet.add(new SystemStreamPartition(systemStream, new Partition(Integer.valueOf(partitionSegment)))); http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java index 48f939c..ef52e90 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java +++ b/samza-core/src/main/java/org/apache/samza/execution/ExecutionPlanner.java @@ -34,6 +34,7 @@ import org.apache.samza.config.ApplicationConfig; import org.apache.samza.config.ClusterManagerConfig; import org.apache.samza.config.Config; import org.apache.samza.config.JobConfig; +import org.apache.samza.config.StreamConfig; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; @@ -43,6 +44,8 @@ import org.apache.samza.table.TableSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.samza.util.StreamUtil.*; + /** * The ExecutionPlanner creates the physical execution graph for the StreamGraph, and @@ -93,8 +96,9 @@ public class ExecutionPlanner { */ /* package private */ JobGraph createJobGraph(OperatorSpecGraph specGraph) { JobGraph jobGraph = new JobGraph(config, specGraph); - Set<StreamSpec> sourceStreams = new HashSet<>(specGraph.getInputOperators().keySet()); - Set<StreamSpec> sinkStreams = new HashSet<>(specGraph.getOutputStreams().keySet()); + StreamConfig streamConfig = new StreamConfig(config); + Set<StreamSpec> sourceStreams = getStreamSpecs(specGraph.getInputOperators().keySet(), streamConfig); + Set<StreamSpec> sinkStreams = getStreamSpecs(specGraph.getOutputStreams().keySet(), streamConfig); Set<StreamSpec> intStreams = new HashSet<>(sourceStreams); Set<TableSpec> tables = new HashSet<>(specGraph.getTables().keySet()); intStreams.retainAll(sinkStreams); @@ -128,7 +132,7 @@ public class ExecutionPlanner { */ /* package private */ void calculatePartitions(JobGraph jobGraph) { // calculate the partitions for the input streams of join operators - calculateJoinInputPartitions(jobGraph); + calculateJoinInputPartitions(jobGraph, config); // calculate the partitions for the rest of intermediate streams calculateIntStreamPartitions(jobGraph, config); @@ -172,7 +176,7 @@ public class ExecutionPlanner { /** * Calculate the partitions for the input streams of join operators */ - /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph) { + /* package private */ static void calculateJoinInputPartitions(JobGraph jobGraph, Config config) { // mapping from a source stream to all join specs reachable from it Multimap<OperatorSpec, StreamEdge> joinSpecToStreamEdges = HashMultimap.create(); // reverse mapping of the above @@ -183,10 +187,10 @@ public class ExecutionPlanner { Set<OperatorSpec> visited = new HashSet<>(); jobGraph.getSpecGraph().getInputOperators().entrySet().forEach(entry -> { - StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(entry.getKey()); + StreamConfig streamConfig = new StreamConfig(config); + StreamEdge streamEdge = jobGraph.getOrCreateStreamEdge(getStreamSpec(entry.getKey(), streamConfig)); // Traverses the StreamGraph to find and update mappings for all Joins reachable from this input StreamEdge - findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, - joinQ, visited); + findReachableJoins(entry.getValue(), streamEdge, joinSpecToStreamEdges, streamEdgeToJoinSpecs, joinQ, visited); }); // At this point, joinQ contains joinSpecs where at least one of the input stream edge partitions is known. @@ -203,7 +207,7 @@ public class ExecutionPlanner { } else if (partitions != edgePartitions) { throw new SamzaException(String.format( "Unable to resolve input partitions of stream %s for join. Expected: %d, Actual: %d", - edge.getFormattedSystemStream(), partitions, edgePartitions)); + edge.getName(), partitions, edgePartitions)); } } } @@ -282,7 +286,7 @@ public class ExecutionPlanner { private static void validatePartitions(JobGraph jobGraph) { for (StreamEdge edge : jobGraph.getIntermediateStreamEdges()) { if (edge.getPartitionCount() <= 0) { - throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getFormattedSystemStream())); + throw new SamzaException(String.format("Failure to assign the partitions to Stream %s", edge.getName())); } } } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java index 843db85..2f210f2 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraph.java @@ -87,7 +87,7 @@ import org.slf4j.LoggerFactory; @Override public List<StreamSpec> getIntermediateStreams() { return getIntermediateStreamEdges().stream() - .map(streamEdge -> streamEdge.getStreamSpec()) + .map(StreamEdge::getStreamSpec) .collect(Collectors.toList()); } @@ -187,12 +187,10 @@ import org.slf4j.LoggerFactory; String streamId = streamSpec.getId(); StreamEdge edge = edges.get(streamId); if (edge == null) { - edge = new StreamEdge(streamSpec, isIntermediate, config); + boolean isBroadcast = specGraph.getBroadcastStreams().contains(streamId); + edge = new StreamEdge(streamSpec, isIntermediate, isBroadcast, config); edges.put(streamId, edge); } - if (streamSpec.isBroadcast()) { - edge.setPartitionCount(1); - } return edge; } @@ -262,11 +260,11 @@ import org.slf4j.LoggerFactory; sources.forEach(edge -> { if (!edge.getSourceNodes().isEmpty()) { throw new IllegalArgumentException( - String.format("Source stream %s should not have producers.", edge.getFormattedSystemStream())); + String.format("Source stream %s should not have producers.", edge.getName())); } if (edge.getTargetNodes().isEmpty()) { throw new IllegalArgumentException( - String.format("Source stream %s should have consumers.", edge.getFormattedSystemStream())); + String.format("Source stream %s should have consumers.", edge.getName())); } }); } @@ -278,11 +276,11 @@ import org.slf4j.LoggerFactory; sinks.forEach(edge -> { if (!edge.getTargetNodes().isEmpty()) { throw new IllegalArgumentException( - String.format("Sink stream %s should not have consumers", edge.getFormattedSystemStream())); + String.format("Sink stream %s should not have consumers", edge.getName())); } if (edge.getSourceNodes().isEmpty()) { throw new IllegalArgumentException( - String.format("Sink stream %s should have producers", edge.getFormattedSystemStream())); + String.format("Sink stream %s should have producers", edge.getName())); } }); } @@ -298,7 +296,7 @@ import org.slf4j.LoggerFactory; internalEdges.forEach(edge -> { if (edge.getSourceNodes().isEmpty() || edge.getTargetNodes().isEmpty()) { throw new IllegalArgumentException( - String.format("Internal stream %s should have both producers and consumers", edge.getFormattedSystemStream())); + String.format("Internal stream %s should have both producers and consumers", edge.getName())); } }); } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java index 298042b..4f2aa23 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobGraphJsonGenerator.java @@ -170,10 +170,10 @@ import org.codehaus.jackson.map.ObjectMapper; private OperatorGraphJson buildOperatorGraphJson(JobNode jobNode) { OperatorGraphJson opGraph = new OperatorGraphJson(); opGraph.inputStreams = new ArrayList<>(); - jobNode.getSpecGraph().getInputOperators().forEach((streamSpec, operatorSpec) -> { + jobNode.getSpecGraph().getInputOperators().forEach((streamId, operatorSpec) -> { StreamJson inputJson = new StreamJson(); opGraph.inputStreams.add(inputJson); - inputJson.streamId = streamSpec.getId(); + inputJson.streamId = streamId; Collection<OperatorSpec> specs = operatorSpec.getRegisteredOperatorSpecs(); inputJson.nextOperatorIds = specs.stream().map(OperatorSpec::getOpId).collect(Collectors.toSet()); @@ -181,9 +181,9 @@ import org.codehaus.jackson.map.ObjectMapper; }); opGraph.outputStreams = new ArrayList<>(); - jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamSpec -> { + jobNode.getSpecGraph().getOutputStreams().keySet().forEach(streamId -> { StreamJson outputJson = new StreamJson(); - outputJson.streamId = streamSpec.getId(); + outputJson.streamId = streamId; opGraph.outputStreams.add(outputJson); }); return opGraph; @@ -219,10 +219,10 @@ import org.codehaus.jackson.map.ObjectMapper; if (spec instanceof OutputOperatorSpec) { OutputStreamImpl outputStream = ((OutputOperatorSpec) spec).getOutputStream(); - map.put("outputStreamId", outputStream.getStreamSpec().getId()); + map.put("outputStreamId", outputStream.getStreamId()); } else if (spec instanceof PartitionByOperatorSpec) { OutputStreamImpl outputStream = ((PartitionByOperatorSpec) spec).getOutputStream(); - map.put("outputStreamId", outputStream.getStreamSpec().getId()); + map.put("outputStreamId", outputStream.getStreamId()); } if (spec instanceof StreamTableJoinOperatorSpec) { http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/JobNode.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java index 288b1a1..dba47e1 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/JobNode.java +++ b/samza-core/src/main/java/org/apache/samza/execution/JobNode.java @@ -49,8 +49,8 @@ import org.apache.samza.table.TableConfigGenerator; import org.apache.samza.util.MathUtil; import org.apache.samza.serializers.Serde; import org.apache.samza.serializers.SerializableSerde; -import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; +import org.apache.samza.util.StreamUtil; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,8 +133,8 @@ public class JobNode { final List<String> inputs = new ArrayList<>(); final List<String> broadcasts = new ArrayList<>(); for (StreamEdge inEdge : inEdges) { - String formattedSystemStream = inEdge.getFormattedSystemStream(); - if (inEdge.getStreamSpec().isBroadcast()) { + String formattedSystemStream = inEdge.getName(); + if (inEdge.isBroadcast()) { broadcasts.add(formattedSystemStream + "#0"); } else { inputs.add(formattedSystemStream); @@ -184,9 +184,9 @@ public class JobNode { List<String> sideInputs = tableSpec.getSideInputs(); if (sideInputs != null && !sideInputs.isEmpty()) { sideInputs.stream() - .map(sideInput -> Util.getSystemStreamFromNameOrId(config, sideInput)) + .map(sideInput -> StreamUtil.getSystemStreamFromNameOrId(config, sideInput)) .forEach(systemStream -> { - inputs.add(Util.getNameFromSystemStream(systemStream)); + inputs.add(StreamUtil.getNameFromSystemStream(systemStream)); configs.put(String.format(StreamConfig.STREAM_PREFIX() + StreamConfig.BOOTSTRAP(), systemStream.getSystem(), systemStream.getStream()), "true"); }); @@ -230,17 +230,17 @@ public class JobNode { // collect all key and msg serde instances for streams Map<String, Serde> streamKeySerdes = new HashMap<>(); Map<String, Serde> streamMsgSerdes = new HashMap<>(); - Map<StreamSpec, InputOperatorSpec> inputOperators = specGraph.getInputOperators(); + Map<String, InputOperatorSpec> inputOperators = specGraph.getInputOperators(); inEdges.forEach(edge -> { String streamId = edge.getStreamSpec().getId(); - InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec()); + InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId); streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde()); streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde()); }); - Map<StreamSpec, OutputStreamImpl> outputStreams = specGraph.getOutputStreams(); + Map<String, OutputStreamImpl> outputStreams = specGraph.getOutputStreams(); outEdges.forEach(edge -> { String streamId = edge.getStreamSpec().getId(); - OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec()); + OutputStreamImpl outputStream = outputStreams.get(streamId); streamKeySerdes.put(streamId, outputStream.getKeySerde()); streamMsgSerdes.put(streamId, outputStream.getValueSerde()); }); http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java index b4c93d9..f2f0310 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamEdge.java @@ -29,7 +29,7 @@ import org.apache.samza.config.MapConfig; import org.apache.samza.config.StreamConfig; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemStream; -import org.apache.samza.util.Util; +import org.apache.samza.util.StreamUtil; /** @@ -41,23 +41,24 @@ public class StreamEdge { public static final int PARTITIONS_UNKNOWN = -1; private final StreamSpec streamSpec; + private final boolean isBroadcast; + private final boolean isIntermediate; private final List<JobNode> sourceNodes = new ArrayList<>(); private final List<JobNode> targetNodes = new ArrayList<>(); private final Config config; + private final String name; - private String name = ""; private int partitions = PARTITIONS_UNKNOWN; - private final boolean isIntermediate; - - StreamEdge(StreamSpec streamSpec, Config config) { - this(streamSpec, false, config); - } - StreamEdge(StreamSpec streamSpec, boolean isIntermediate, Config config) { + StreamEdge(StreamSpec streamSpec, boolean isIntermediate, boolean isBroadcast, Config config) { this.streamSpec = streamSpec; - this.name = Util.getNameFromSystemStream(getSystemStream()); this.isIntermediate = isIntermediate; + this.isBroadcast = isBroadcast; this.config = config; + if (isBroadcast) { + partitions = 1; + } + this.name = StreamUtil.getNameFromSystemStream(getSystemStream()); } void addSourceNode(JobNode sourceNode) { @@ -85,10 +86,6 @@ public class StreamEdge { return getStreamSpec().toSystemStream(); } - String getFormattedSystemStream() { - return Util.getNameFromSystemStream(getSystemStream()); - } - List<JobNode> getSourceNodes() { return sourceNodes; } @@ -109,10 +106,6 @@ public class StreamEdge { return name; } - void setName(String name) { - this.name = name; - } - boolean isIntermediate() { return isIntermediate; } @@ -128,12 +121,13 @@ public class StreamEdge { config.put(String.format(StreamConfig.CONSUMER_OFFSET_DEFAULT_FOR_STREAM_ID(), spec.getId()), "oldest"); config.put(String.format(StreamConfig.PRIORITY_FOR_STREAM_ID(), spec.getId()), String.valueOf(Integer.MAX_VALUE)); } - if (spec.isBounded()) { - config.put(String.format(StreamConfig.IS_BOUNDED_FOR_STREAM_ID(), spec.getId()), "true"); - } spec.getConfig().forEach((property, value) -> { config.put(String.format(StreamConfig.STREAM_ID_PREFIX(), spec.getId()) + property, value); }); return new MapConfig(config); } + + public boolean isBroadcast() { + return isBroadcast; + } } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java index 7f60f96..2921f3b 100644 --- a/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/execution/StreamManager.java @@ -38,6 +38,7 @@ import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemAdmins; import org.apache.samza.system.SystemStream; import org.apache.samza.system.SystemStreamMetadata; +import org.apache.samza.util.StreamUtil; import org.apache.samza.util.Util; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -142,7 +143,7 @@ public class StreamManager { .getOrElse(defaultValue(null)); if (changelog != null) { LOGGER.info("Clear store {} changelog {}", store, changelog); - SystemStream systemStream = Util.getSystemStreamFromNames(changelog); + SystemStream systemStream = StreamUtil.getSystemStreamFromNames(changelog); StreamSpec spec = StreamSpec.createChangeLogStreamSpec(systemStream.getStream(), systemStream.getSystem(), 1); systemAdmins.getSystemAdmin(spec.getSystemName()).clearStream(spec); } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java index ba51c7c..b6c3dae 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/OperatorSpecGraph.java @@ -29,7 +29,6 @@ import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.serializers.SerializableSerde; -import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; @@ -41,8 +40,9 @@ import org.apache.samza.table.TableSpec; */ public class OperatorSpecGraph implements Serializable { // We use a LHM for deterministic order in initializing and closing operators. - private final Map<StreamSpec, InputOperatorSpec> inputOperators; - private final Map<StreamSpec, OutputStreamImpl> outputStreams; + private final Map<String, InputOperatorSpec> inputOperators; + private final Map<String, OutputStreamImpl> outputStreams; + private final Set<String> broadcastStreams; private final Map<TableSpec, TableImpl> tables; private final Set<OperatorSpec> allOpSpecs; private final boolean hasWindowOrJoins; @@ -54,20 +54,25 @@ public class OperatorSpecGraph implements Serializable { OperatorSpecGraph(StreamGraphSpec graphSpec) { this.inputOperators = graphSpec.getInputOperators(); this.outputStreams = graphSpec.getOutputStreams(); + this.broadcastStreams = graphSpec.getBroadcastStreams(); this.tables = graphSpec.getTables(); this.allOpSpecs = Collections.unmodifiableSet(this.findAllOperatorSpecs()); this.hasWindowOrJoins = checkWindowOrJoins(); this.serializedOpSpecGraph = opSpecGraphSerde.toBytes(this); } - public Map<StreamSpec, InputOperatorSpec> getInputOperators() { + public Map<String, InputOperatorSpec> getInputOperators() { return inputOperators; } - public Map<StreamSpec, OutputStreamImpl> getOutputStreams() { + public Map<String, OutputStreamImpl> getOutputStreams() { return outputStreams; } + public Set<String> getBroadcastStreams() { + return broadcastStreams; + } + public Map<TableSpec, TableImpl> getTables() { return tables; } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java index ea9690b..a187b94 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java @@ -34,11 +34,9 @@ import org.apache.samza.operators.spec.OperatorSpec.OpCode; import org.apache.samza.operators.spec.OperatorSpecs; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; -import org.apache.samza.runtime.ApplicationRunner; import org.apache.samza.serializers.KVSerde; import org.apache.samza.serializers.NoOpSerde; import org.apache.samza.serializers.Serde; -import org.apache.samza.system.StreamSpec; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; import org.slf4j.Logger; @@ -55,13 +53,13 @@ import com.google.common.base.Preconditions; */ public class StreamGraphSpec implements StreamGraph { private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphSpec.class); - private static final Pattern USER_DEFINED_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+"); + private static final Pattern ID_PATTERN = Pattern.compile("[\\d\\w-_.]+"); // We use a LHM for deterministic order in initializing and closing operators. - private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); - private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); + private final Map<String, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); + private final Map<String, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); + private final Set<String> broadcastStreams = new HashSet<>(); private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>(); - private final ApplicationRunner runner; private final Config config; /** @@ -74,10 +72,7 @@ public class StreamGraphSpec implements StreamGraph { private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde()); private ContextManager contextManager = null; - public StreamGraphSpec(ApplicationRunner runner, Config config) { - // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphSpec once Systems - // can use streamId to send and receive messages. - this.runner = runner; + public StreamGraphSpec(Config config) { this.config = config; } @@ -91,15 +86,15 @@ public class StreamGraphSpec implements StreamGraph { @Override public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) { - StreamSpec streamSpec = runner.getStreamSpec(streamId); - Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId); + Preconditions.checkState(isValidId(streamId), + "streamId must be non-empty and must not contain spaces or special characters: " + streamId); Preconditions.checkNotNull(serde, "serde must not be null for an input stream."); - Preconditions.checkState(!inputOperators.containsKey(streamSpec), + Preconditions.checkState(!inputOperators.containsKey(streamId), "getInputStream must not be called multiple times with the same streamId: " + streamId); KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - if (outputStreams.containsKey(streamSpec)) { - OutputStreamImpl outputStream = outputStreams.get(streamSpec); + if (outputStreams.containsKey(streamId)) { + OutputStreamImpl outputStream = outputStreams.get(streamId); Serde keySerde = outputStream.getKeySerde(); Serde valueSerde = outputStream.getValueSerde(); Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), @@ -109,10 +104,10 @@ public class StreamGraphSpec implements StreamGraph { boolean isKeyed = serde instanceof KVSerde; InputOperatorSpec inputOperatorSpec = - OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), + OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId(OpCode.INPUT, null)); - inputOperators.put(streamSpec, inputOperatorSpec); - return new MessageStreamImpl<>(this, inputOperators.get(streamSpec)); + inputOperators.put(streamId, inputOperatorSpec); + return new MessageStreamImpl<>(this, inputOperators.get(streamId)); } @Override @@ -122,15 +117,15 @@ public class StreamGraphSpec implements StreamGraph { @Override public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) { - StreamSpec streamSpec = runner.getStreamSpec(streamId); - Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId); + Preconditions.checkState(isValidId(streamId), + "streamId must be non-empty and must not contain spaces or special characters: " + streamId); Preconditions.checkNotNull(serde, "serde must not be null for an output stream."); - Preconditions.checkState(!outputStreams.containsKey(streamSpec), + Preconditions.checkState(!outputStreams.containsKey(streamId), "getOutputStream must not be called multiple times with the same streamId: " + streamId); KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - if (inputOperators.containsKey(streamSpec)) { - InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec); + if (inputOperators.containsKey(streamId)) { + InputOperatorSpec inputOperatorSpec = inputOperators.get(streamId); Serde keySerde = inputOperatorSpec.getKeySerde(); Serde valueSerde = inputOperatorSpec.getValueSerde(); Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), @@ -139,8 +134,8 @@ public class StreamGraphSpec implements StreamGraph { } boolean isKeyed = serde instanceof KVSerde; - outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); - return outputStreams.get(streamSpec); + outputStreams.put(streamId, new OutputStreamImpl<>(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); + return outputStreams.get(streamId); } @Override @@ -183,8 +178,8 @@ public class StreamGraphSpec implements StreamGraph { * @return the unique ID for the next operator in the graph */ public String getNextOpId(OpCode opCode, String userDefinedId) { - if (StringUtils.isNotBlank(userDefinedId) && !USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) { - throw new SamzaException("Operator ID must not contain spaces and special characters: " + userDefinedId); + if (StringUtils.isNotBlank(userDefinedId) && !ID_PATTERN.matcher(userDefinedId).matches()) { + throw new SamzaException("Operator ID must not contain spaces or special characters: " + userDefinedId); } String nextOpId = String.format("%s-%s-%s-%s", @@ -234,17 +229,10 @@ public class StreamGraphSpec implements StreamGraph { * @param isBroadcast whether the stream is a broadcast stream. * @param <M> the type of messages in the intermediate {@link MessageStream} * @return the intermediate {@link MessageStreamImpl} - * - * TODO: once SAMZA-1566 is resolved, we should be able to pass in the StreamSpec directly. */ @VisibleForTesting <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) { - StreamSpec streamSpec = runner.getStreamSpec(streamId); - if (isBroadcast) { - streamSpec = streamSpec.copyWithBroadCast(); - } - - Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec), + Preconditions.checkState(!inputOperators.containsKey(streamId) && !outputStreams.containsKey(streamId), "getIntermediateStream must not be called multiple times with the same streamId: " + streamId); if (serde == null) { @@ -252,28 +240,37 @@ public class StreamGraphSpec implements StreamGraph { serde = (Serde<M>) defaultSerde; } + if (isBroadcast) broadcastStreams.add(streamId); boolean isKeyed = serde instanceof KVSerde; KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); InputOperatorSpec inputOperatorSpec = - OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), + OperatorSpecs.createInputOperatorSpec(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed, this.getNextOpId(OpCode.INPUT, null)); - inputOperators.put(streamSpec, inputOperatorSpec); - outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); - return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec)); + inputOperators.put(streamId, inputOperatorSpec); + outputStreams.put(streamId, new OutputStreamImpl(streamId, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); + return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamId), outputStreams.get(streamId)); } - Map<StreamSpec, InputOperatorSpec> getInputOperators() { + Map<String, InputOperatorSpec> getInputOperators() { return Collections.unmodifiableMap(inputOperators); } - Map<StreamSpec, OutputStreamImpl> getOutputStreams() { + Map<String, OutputStreamImpl> getOutputStreams() { return Collections.unmodifiableMap(outputStreams); } + Set<String> getBroadcastStreams() { + return Collections.unmodifiableSet(broadcastStreams); + } + Map<TableSpec, TableImpl> getTables() { return Collections.unmodifiableMap(tables); } + private boolean isValidId(String id) { + return StringUtils.isNotBlank(id) && ID_PATTERN.matcher(id).matches(); + } + private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) { Serde keySerde, valueSerde; http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java index 8df670e..99ed089 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java @@ -40,9 +40,9 @@ class BroadcastOperatorImpl<M> extends OperatorImpl<M, Void> { private final SystemStream systemStream; private final String taskName; - BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, TaskContext context) { + BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, SystemStream systemStream, TaskContext context) { this.broadcastOpSpec = broadcastOpSpec; - this.systemStream = broadcastOpSpec.getOutputStream().getSystemStream(); + this.systemStream = systemStream; this.taskName = context.getTaskName().getTaskName(); } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index df73e48..7f62e00 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -22,6 +22,7 @@ import com.google.common.collect.HashMultimap; import com.google.common.collect.Lists; import com.google.common.collect.Multimap; import org.apache.samza.config.Config; +import org.apache.samza.config.StreamConfig; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.job.model.JobModel; import org.apache.samza.operators.KV; @@ -96,12 +97,14 @@ public class OperatorImplGraph { */ public OperatorImplGraph(OperatorSpecGraph specGraph, Config config, TaskContext context, Clock clock) { this.clock = clock; - + StreamConfig streamConfig = new StreamConfig(config); TaskContextImpl taskContext = (TaskContextImpl) context; - Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(specGraph) ? - getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()), - getIntermediateToInputStreamsMap(specGraph)) : - Collections.EMPTY_MAP; + Map<SystemStream, Integer> producerTaskCounts = + hasIntermediateStreams(specGraph) + ? getProducerTaskCountForIntermediateStreams( + getStreamToConsumerTasks(taskContext.getJobModel()), + getIntermediateToInputStreamsMap(specGraph, streamConfig)) + : Collections.EMPTY_MAP; producerTaskCounts.forEach((stream, count) -> { LOG.info("{} has {} producer tasks.", stream, count); }); @@ -113,8 +116,8 @@ public class OperatorImplGraph { taskContext.registerObject(WatermarkStates.class.getName(), new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts)); - specGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> { - SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName()); + specGraph.getInputOperators().forEach((streamId, inputOpSpec) -> { + SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId); InputOperatorImpl inputOperatorImpl = (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, config, context); this.inputOperators.put(systemStream, inputOperatorImpl); @@ -210,6 +213,7 @@ public class OperatorImplGraph { */ OperatorImpl createOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, Config config, TaskContext context) { + StreamConfig streamConfig = new StreamConfig(config); if (operatorSpec instanceof InputOperatorSpec) { return new InputOperatorImpl((InputOperatorSpec) operatorSpec); } else if (operatorSpec instanceof StreamOperatorSpec) { @@ -217,9 +221,13 @@ public class OperatorImplGraph { } else if (operatorSpec instanceof SinkOperatorSpec) { return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof OutputOperatorSpec) { - return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec); + String streamId = ((OutputOperatorSpec) operatorSpec).getOutputStream().getStreamId(); + SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId); + return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, systemStream); } else if (operatorSpec instanceof PartitionByOperatorSpec) { - return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, config, context); + String streamId = ((PartitionByOperatorSpec) operatorSpec).getOutputStream().getStreamId(); + SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId); + return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, systemStream, context); } else if (operatorSpec instanceof WindowOperatorSpec) { return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock); } else if (operatorSpec instanceof JoinOperatorSpec) { @@ -231,7 +239,9 @@ public class OperatorImplGraph { } else if (operatorSpec instanceof SendToTableOperatorSpec) { return new SendToTableOperatorImpl((SendToTableOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof BroadcastOperatorSpec) { - return new BroadcastOperatorImpl((BroadcastOperatorSpec) operatorSpec, context); + String streamId = ((BroadcastOperatorSpec) operatorSpec).getOutputStream().getStreamId(); + SystemStream systemStream = streamConfig.streamIdToSystemStream(streamId); + return new BroadcastOperatorImpl((BroadcastOperatorSpec) operatorSpec, systemStream, context); } throw new IllegalArgumentException( String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); @@ -323,10 +333,6 @@ public class OperatorImplGraph { }; } - private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) { - return !Collections.disjoint(specGraph.getInputOperators().keySet(), specGraph.getOutputStreams().keySet()); - } - /** * calculate the task count that produces to each intermediate streams * @param streamToConsumerTasks input streams to task mapping @@ -337,12 +343,11 @@ public class OperatorImplGraph { Multimap<SystemStream, String> streamToConsumerTasks, Multimap<SystemStream, SystemStream> intermediateToInputStreams) { Map<SystemStream, Integer> result = new HashMap<>(); - intermediateToInputStreams.asMap().entrySet().forEach(entry -> { + intermediateToInputStreams.asMap().entrySet().forEach(entry -> result.put(entry.getKey(), entry.getValue().stream() .flatMap(systemStream -> streamToConsumerTasks.get(systemStream).stream()) - .collect(Collectors.toSet()).size()); - }); + .collect(Collectors.toSet()).size())); return result; } @@ -368,25 +373,34 @@ public class OperatorImplGraph { * @param specGraph the user {@link OperatorSpecGraph} * @return mapping from output streams to input streams */ - static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(OperatorSpecGraph specGraph) { + static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap( + OperatorSpecGraph specGraph, StreamConfig streamConfig) { Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create(); specGraph.getInputOperators().entrySet().stream() - .forEach( - entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams)); + .forEach(entry -> { + SystemStream systemStream = streamConfig.streamIdToSystemStream(entry.getKey()); + computeOutputToInput(systemStream, entry.getValue(), outputToInputStreams, streamConfig); + }); return outputToInputStreams; } private static void computeOutputToInput(SystemStream input, OperatorSpec opSpec, - Multimap<SystemStream, SystemStream> outputToInputStreams) { + Multimap<SystemStream, SystemStream> outputToInputStreams, StreamConfig streamConfig) { if (opSpec instanceof PartitionByOperatorSpec) { PartitionByOperatorSpec spec = (PartitionByOperatorSpec) opSpec; - outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(), input); + SystemStream systemStream = streamConfig.streamIdToSystemStream(spec.getOutputStream().getStreamId()); + outputToInputStreams.put(systemStream, input); } else if (opSpec instanceof BroadcastOperatorSpec) { BroadcastOperatorSpec spec = (BroadcastOperatorSpec) opSpec; - outputToInputStreams.put(spec.getOutputStream().getStreamSpec().toSystemStream(), input); + SystemStream systemStream = streamConfig.streamIdToSystemStream(spec.getOutputStream().getStreamId()); + outputToInputStreams.put(systemStream, input); } else { Collection<OperatorSpec> nextOperators = opSpec.getRegisteredOperatorSpecs(); - nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams)); + nextOperators.forEach(spec -> computeOutputToInput(input, spec, outputToInputStreams, streamConfig)); } } + + private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) { + return !Collections.disjoint(specGraph.getInputOperators().keySet(), specGraph.getOutputStreams().keySet()); + } } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java index e625484..22fbb1b 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java @@ -42,10 +42,10 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> { private final OutputStreamImpl<M> outputStream; private final SystemStream systemStream; - OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec) { + OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, SystemStream systemStream) { this.outputOpSpec = outputOpSpec; this.outputStream = outputOpSpec.getOutputStream(); - this.systemStream = outputStream.getSystemStream(); + this.systemStream = systemStream; } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java index dd64429..63e269d 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java @@ -20,10 +20,8 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; -import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.spec.PartitionByOperatorSpec; import org.apache.samza.system.ControlMessage; import org.apache.samza.system.EndOfStreamMessage; @@ -51,10 +49,10 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { private final String taskName; private final ControlMessageSender controlMessageSender; - PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, Config config, TaskContext context) { + PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, + SystemStream systemStream, TaskContext context) { this.partitionByOpSpec = partitionByOpSpec; - OutputStreamImpl<KV<K, V>> outputStream = partitionByOpSpec.getOutputStream(); - this.systemStream = outputStream.getSystemStream(); + this.systemStream = systemStream; this.keyFunction = partitionByOpSpec.getKeyFunction(); this.valueFunction = partitionByOpSpec.getValueFunction(); this.taskName = context.getTaskName().getTaskName(); @@ -102,7 +100,6 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { } private void sendControlMessage(ControlMessage message, MessageCollector collector) { - SystemStream outputStream = partitionByOpSpec.getOutputStream().getSystemStream(); - controlMessageSender.send(message, outputStream, collector); + controlMessageSender.send(message, systemStream, collector); } } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index a636ac5..922a1f9 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@ -22,7 +22,6 @@ import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; import org.apache.samza.serializers.Serde; -import org.apache.samza.system.StreamSpec; /** * The spec for an operator that receives incoming messages from an input stream @@ -34,7 +33,7 @@ import org.apache.samza.system.StreamSpec; public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // Object == KV<K, V> | V private final boolean isKeyed; - private final StreamSpec streamSpec; + private final String streamId; /** * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the configs for a stream, and deserialized @@ -43,17 +42,16 @@ public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // private transient final Serde<K> keySerde; private transient final Serde<V> valueSerde; - public InputOperatorSpec(StreamSpec streamSpec, - Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) { + public InputOperatorSpec(String streamId, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) { super(OpCode.INPUT, opId); - this.streamSpec = streamSpec; + this.streamId = streamId; this.keySerde = keySerde; this.valueSerde = valueSerde; this.isKeyed = isKeyed; } - public StreamSpec getStreamSpec() { - return this.streamSpec; + public String getStreamId() { + return this.streamId; } public Serde<K> getKeySerde() { http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index 6e98d5a..9e788da 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -28,7 +28,6 @@ import org.apache.samza.operators.functions.SinkFunction; import org.apache.samza.operators.functions.StreamTableJoinFunction; import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.serializers.Serde; -import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; @@ -42,7 +41,7 @@ public class OperatorSpecs { /** * Creates an {@link InputOperatorSpec} for consuming input. * - * @param streamSpec the stream spec for the input stream + * @param streamId the stream id for the input stream * @param keySerde the serde for the input key * @param valueSerde the serde for the input value * @param isKeyed whether the input stream is keyed @@ -52,8 +51,8 @@ public class OperatorSpecs { * @return the {@link InputOperatorSpec} */ public static <K, V> InputOperatorSpec<K, V> createInputOperatorSpec( - StreamSpec streamSpec, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) { - return new InputOperatorSpec<>(streamSpec, keySerde, valueSerde, isKeyed, opId); + String streamId, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) { + return new InputOperatorSpec<>(streamId, keySerde, valueSerde, isKeyed, opId); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java index fe0abcb..5d70e6f 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java @@ -21,13 +21,10 @@ package org.apache.samza.operators.spec; import java.io.Serializable; import org.apache.samza.operators.OutputStream; import org.apache.samza.serializers.Serde; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.system.SystemStream; - public class OutputStreamImpl<M> implements OutputStream<M>, Serializable { - private final StreamSpec streamSpec; + private final String streamId; private final boolean isKeyed; /** @@ -37,16 +34,15 @@ public class OutputStreamImpl<M> implements OutputStream<M>, Serializable { private transient final Serde keySerde; private transient final Serde valueSerde; - public OutputStreamImpl(StreamSpec streamSpec, - Serde keySerde, Serde valueSerde, boolean isKeyed) { - this.streamSpec = streamSpec; + public OutputStreamImpl(String streamId, Serde keySerde, Serde valueSerde, boolean isKeyed) { + this.streamId = streamId; this.keySerde = keySerde; this.valueSerde = valueSerde; this.isKeyed = isKeyed; } - public StreamSpec getStreamSpec() { - return streamSpec; + public String getStreamId() { + return streamId; } public Serde getKeySerde() { @@ -57,10 +53,6 @@ public class OutputStreamImpl<M> implements OutputStream<M>, Serializable { return valueSerde; } - public SystemStream getSystemStream() { - return this.streamSpec.toSystemStream(); - } - public boolean isKeyed() { return isKeyed; } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java index 272ba63..3bb8713 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java @@ -24,7 +24,6 @@ import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; -import org.apache.samza.system.StreamSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -51,13 +50,13 @@ public class IntermediateMessageStreamImpl<M> extends MessageStreamImpl<M> imple this.outputStream = outputStream; if (inputOperatorSpec.isKeyed() != outputStream.isKeyed()) { LOGGER.error("Input and output streams for intermediate stream {} aren't keyed consistently. Input: {}, Output: {}", - new Object[]{inputOperatorSpec.getStreamSpec().getId(), inputOperatorSpec.isKeyed(), outputStream.isKeyed()}); + new Object[]{inputOperatorSpec.getStreamId(), inputOperatorSpec.isKeyed(), outputStream.isKeyed()}); } this.isKeyed = inputOperatorSpec.isKeyed() && outputStream.isKeyed(); } - public StreamSpec getStreamSpec() { - return this.outputStream.getStreamSpec(); + public String getStreamId() { + return this.outputStream.getStreamId(); } public OutputStreamImpl<M> getOutputStream() { http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java index 3716d2b..7cd19fb 100644 --- a/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java +++ b/samza-core/src/main/java/org/apache/samza/runtime/AbstractApplicationRunner.java @@ -38,7 +38,6 @@ import org.apache.samza.execution.ExecutionPlanner; import org.apache.samza.execution.StreamManager; import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.StreamGraphSpec; -import org.apache.samza.system.StreamSpec; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,58 +55,7 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { public AbstractApplicationRunner(Config config) { super(config); - this.graphSpec = new StreamGraphSpec(this, config); - } - - @Override - public StreamSpec getStreamSpec(String streamId) { - StreamConfig streamConfig = new StreamConfig(config); - String physicalName = streamConfig.getPhysicalName(streamId); - return getStreamSpec(streamId, physicalName); - } - - /** - * Constructs a {@link StreamSpec} from the configuration for the specified streamId. - * - * The stream configurations are read from the following properties in the config: - * {@code streams.{$streamId}.*} - * <br> - * All properties matching this pattern are assumed to be system-specific with one exception. The following - * property is a Samza property which is used to bind the stream to a system. - * - * <ul> - * <li>samza.system - The name of the System on which this stream will be used. If this property isn't defined - * the stream will be associated with the System defined in {@code job.default.system}</li> - * </ul> - * - * @param streamId The logical identifier for the stream in Samza. - * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. - * @return The {@link StreamSpec} instance. - */ - /*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName) { - StreamConfig streamConfig = new StreamConfig(config); - String system = streamConfig.getSystem(streamId); - - return getStreamSpec(streamId, physicalName, system); - } - - /** - * Constructs a {@link StreamSpec} from the configuration for the specified streamId. - * - * The stream configurations are read from the following properties in the config: - * {@code streams.{$streamId}.*} - * - * @param streamId The logical identifier for the stream in Samza. - * @param physicalName The system-specific name for this stream. It could be a file URN, topic name, or other identifer. - * @param system The name of the System on which this stream will be used. - * @return The {@link StreamSpec} instance. - */ - /*package private*/ StreamSpec getStreamSpec(String streamId, String physicalName, String system) { - StreamConfig streamConfig = new StreamConfig(config); - Map<String, String> properties = streamConfig.getStreamProperties(streamId); - boolean isBounded = streamConfig.getIsBounded(streamId); - - return new StreamSpec(streamId, physicalName, system, isBounded, properties); + this.graphSpec = new StreamGraphSpec(config); } public ExecutionPlan getExecutionPlan(StreamApplication app, StreamManager streamManager) throws Exception { @@ -118,20 +66,22 @@ public abstract class AbstractApplicationRunner extends ApplicationRunner { ExecutionPlan getExecutionPlan(StreamApplication app, String runId, StreamManager streamManager) throws Exception { // build stream graph app.init(graphSpec, config); - OperatorSpecGraph specGraph = graphSpec.getOperatorSpecGraph(); - // create the physical execution plan + + // update application configs Map<String, String> cfg = new HashMap<>(config); if (StringUtils.isNoneEmpty(runId)) { cfg.put(ApplicationConfig.APP_RUN_ID, runId); } - Set<StreamSpec> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet()); + StreamConfig streamConfig = new StreamConfig(config); + Set<String> inputStreams = new HashSet<>(specGraph.getInputOperators().keySet()); inputStreams.removeAll(specGraph.getOutputStreams().keySet()); - ApplicationMode mode = inputStreams.stream().allMatch(StreamSpec::isBounded) + ApplicationMode mode = inputStreams.stream().allMatch(streamConfig::getIsBounded) ? ApplicationMode.BATCH : ApplicationMode.STREAM; cfg.put(ApplicationConfig.APP_MODE, mode.name()); + // create the physical execution plan ExecutionPlanner planner = new ExecutionPlanner(new MapConfig(cfg), streamManager); return planner.plan(specGraph); } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java index 6aeb2ba..ea55fe5 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java +++ b/samza-core/src/main/java/org/apache/samza/storage/ChangelogStreamManager.java @@ -34,7 +34,7 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamManager; import org.apache.samza.system.StreamSpec; import org.apache.samza.system.SystemAdmin; import org.apache.samza.system.SystemStream; -import org.apache.samza.util.Util; +import org.apache.samza.util.StreamUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -113,7 +113,7 @@ public class ChangelogStreamManager { .stream() .filter(name -> StringUtils.isNotBlank(storageConfig.getChangelogStream(name))) .collect(Collectors.toMap(name -> name, - name -> Util.getSystemStreamFromNames(storageConfig.getChangelogStream(name)))); + name -> StreamUtil.getSystemStreamFromNames(storageConfig.getChangelogStream(name)))); // Get SystemAdmin for changelog store's system and attempt to create the stream JavaSystemConfig systemConfig = new JavaSystemConfig(config); http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index c807b02..f9c6c0c 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -52,6 +52,7 @@ import org.apache.samza.system.SystemStreamPartition; import org.apache.samza.util.Clock; import org.apache.samza.util.CommandLine; import org.apache.samza.util.ScalaJavaUtil; +import org.apache.samza.util.StreamUtil; import org.apache.samza.util.SystemClock; import org.apache.samza.util.Util; import org.slf4j.Logger; @@ -151,7 +152,7 @@ public class StorageRecovery extends CommandLine { log.info("stream name for " + storeName + " is " + streamName); if (streamName != null) { - changeLogSystemStreams.put(storeName, Util.getSystemStreamFromNames(streamName)); + changeLogSystemStreams.put(storeName, StreamUtil.getSystemStreamFromNames(streamName)); } String factoryClass = config.getStorageFactoryClassName(storeName); http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java new file mode 100644 index 0000000..e7a1e54 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/util/StreamUtil.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.util; + +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.StreamConfig; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStream; + +public class StreamUtil { + /** + * Gets the {@link SystemStream} corresponding to the provided stream, which may be + * a streamId, or stream name of the format systemName.streamName. + * + * @param stream the stream name or id to get the {@link SystemStream} for. + * @return the {@link SystemStream} for the stream + */ + public static SystemStream getSystemStreamFromNameOrId(Config config, String stream) { + String[] parts = stream.split("\\."); + if (parts.length == 0 || parts.length > 2) { + throw new SamzaException( + String.format("Invalid stream %s. Expected to be of the format streamId or systemName.streamName", stream)); + } + if (parts.length == 1) { + return new StreamConfig(config).streamIdToSystemStream(stream); + } else { + return new SystemStream(parts[0], parts[1]); + } + } + + /** + * Returns a SystemStream object based on the system stream name given. For + * example, kafka.topic would return SystemStream("kafka", "topic"). + * + * @param systemStreamName name of the system stream + * @return the {@link SystemStream} for the {@code systemStreamName} + */ + public static SystemStream getSystemStreamFromNames(String systemStreamName) { + int idx = systemStreamName.indexOf('.'); + if (idx < 0) { + throw new IllegalArgumentException("No '.' in stream name '" + systemStreamName + + "'. Stream names should be in the form 'system.stream'"); + } + return new SystemStream( + systemStreamName.substring(0, idx), + systemStreamName.substring(idx + 1, systemStreamName.length())); + } + + /** + * Returns the period separated system stream name for the provided {@code systemStream}. For + * example, SystemStream("kafka", "topic") would return "kafka.topic". + * + * @param systemStream the {@link SystemStream} to get the name for + * @return the system stream name + */ + public static String getNameFromSystemStream(SystemStream systemStream) { + return systemStream.getSystem() + "." + systemStream.getStream(); + } + + public static Set<StreamSpec> getStreamSpecs(Set<String> streamIds, StreamConfig streamConfig) { + return streamIds.stream().map(streamId -> getStreamSpec(streamId, streamConfig)).collect(Collectors.toSet()); + } + + public static StreamSpec getStreamSpec(String streamId, StreamConfig streamConfig) { + String physicalName = streamConfig.getPhysicalName(streamId); + String system = streamConfig.getSystem(streamId); + Map<String, String> streamProperties = streamConfig.getStreamProperties(streamId); + return new StreamSpec(streamId, physicalName, system, streamProperties); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala index c9df3b5..42b6130 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/StorageConfig.scala @@ -22,8 +22,7 @@ package org.apache.samza.config import java.util.concurrent.TimeUnit import scala.collection.JavaConverters._ -import org.apache.samza.util.Logging -import org.apache.samza.util.Util +import org.apache.samza.util.{Logging, StreamUtil} object StorageConfig { // stream config constants @@ -106,7 +105,7 @@ class StorageConfig(config: Config) extends ScalaMapConfig(config) with Logging .map(getChangelogStream(_)) .filter(_.isDefined) // Convert "system.stream" to systemName - .map(systemStreamName => Util.getSystemStreamFromNames(systemStreamName.get).getSystem) + .map(systemStreamName => StreamUtil.getSystemStreamFromNames(systemStreamName.get).getSystem) .contains(systemName) } http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala index ab11785..a64589f 100644 --- a/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala +++ b/samza-core/src/main/scala/org/apache/samza/config/TaskConfig.scala @@ -22,7 +22,7 @@ package org.apache.samza.config import org.apache.samza.checkpoint.CheckpointManager import org.apache.samza.metrics.MetricsRegistry import org.apache.samza.system.SystemStream -import org.apache.samza.util.{Logging, Util} +import org.apache.samza.util.{Logging, StreamUtil} object TaskConfig { // task config constants @@ -78,7 +78,7 @@ class TaskConfig(config: Config) extends ScalaMapConfig(config) with Logging { def getInputStreams = getOption(TaskConfig.INPUT_STREAMS) match { case Some(streams) => if (streams.length > 0) { streams.split(",").map(systemStreamNames => { - Util.getSystemStreamFromNames(systemStreamNames.trim) + StreamUtil.getSystemStreamFromNames(systemStreamNames.trim) }).toSet } else { Set[SystemStream]() http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala index 35802ac..bb1b1cf 100644 --- a/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala +++ b/samza-core/src/main/scala/org/apache/samza/container/SamzaContainer.scala @@ -326,7 +326,7 @@ object SamzaContainer extends Logging { .getStoreNames .filter(config.getChangelogStream(_).isDefined) .map(name => (name, config.getChangelogStream(name).get)).toMap - .mapValues(Util.getSystemStreamFromNames(_)) + .mapValues(StreamUtil.getSystemStreamFromNames(_)) info("Got change log system streams: %s" format changeLogSystemStreams) @@ -357,7 +357,7 @@ object SamzaContainer extends Logging { val sideInputStoresToSystemStreams = config.getStoreNames .map { storeName => (storeName, config.getSideInputs(storeName)) } .filter { case (storeName, sideInputs) => sideInputs.nonEmpty } - .map { case (storeName, sideInputs) => (storeName, sideInputs.map(Util.getSystemStreamFromNameOrId(config, _))) } + .map { case (storeName, sideInputs) => (storeName, sideInputs.map(StreamUtil.getSystemStreamFromNameOrId(config, _))) } .toMap info("Got side input store system streams: %s" format sideInputStoresToSystemStreams) http://git-wip-us.apache.org/repos/asf/samza/blob/440a25c9/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala ---------------------------------------------------------------------- diff --git a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala index 7b83874..0b472aa 100644 --- a/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala +++ b/samza-core/src/main/scala/org/apache/samza/job/local/ThreadJobFactory.scala @@ -28,7 +28,6 @@ import org.apache.samza.coordinator.stream.CoordinatorStreamManager import org.apache.samza.job.{StreamJob, StreamJobFactory} import org.apache.samza.metrics.{JmxServer, MetricsRegistryMap, MetricsReporter} import org.apache.samza.operators.StreamGraphSpec -import org.apache.samza.runtime.LocalContainerRunner import org.apache.samza.storage.ChangelogStreamManager import org.apache.samza.task.TaskFactoryUtil import org.apache.samza.util.Logging @@ -72,10 +71,9 @@ class ThreadJobFactory extends StreamJobFactory with Logging { val containerId = "0" val jmxServer = new JmxServer val streamApp = TaskFactoryUtil.createStreamApplication(config) - val appRunner = new LocalContainerRunner(jobModel, "0") val taskFactory = if (streamApp != null) { - val graphSpec = new StreamGraphSpec(appRunner, config) + val graphSpec = new StreamGraphSpec(config) streamApp.init(graphSpec, config) TaskFactoryUtil.createTaskFactory(graphSpec.getOperatorSpecGraph(), graphSpec.getContextManager) } else {
