http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java deleted file mode 100644 index 7ddcd19..0000000 --- a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphImpl.java +++ /dev/null @@ -1,328 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.samza.operators; - -import java.util.Collection; -import java.util.Collections; -import java.util.HashSet; -import java.util.LinkedHashMap; -import java.util.Map; -import java.util.Set; -import java.util.regex.Pattern; -import java.util.stream.Collectors; - -import org.apache.commons.lang3.StringUtils; -import org.apache.samza.SamzaException; -import org.apache.samza.config.Config; -import org.apache.samza.config.JobConfig; -import org.apache.samza.operators.spec.InputOperatorSpec; -import org.apache.samza.operators.spec.OperatorSpec; -import org.apache.samza.operators.spec.OperatorSpec.OpCode; -import org.apache.samza.operators.spec.OperatorSpecs; -import org.apache.samza.operators.spec.OutputStreamImpl; -import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; -import org.apache.samza.runtime.ApplicationRunner; -import org.apache.samza.serializers.KVSerde; -import org.apache.samza.serializers.NoOpSerde; -import org.apache.samza.serializers.Serde; -import org.apache.samza.system.StreamSpec; -import org.apache.samza.table.Table; -import org.apache.samza.table.TableSpec; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.base.Preconditions; - -/** - * A {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to - * create the DAG of transforms. - */ -public class StreamGraphImpl implements StreamGraph { - private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphImpl.class); - private static final Pattern USER_DEFINED_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+"); - - // We use a LHM for deterministic order in initializing and closing operators. - private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); - private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); - private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>(); - private final ApplicationRunner runner; - private final Config config; - - - /** - * The 0-based position of the next operator in the graph. - * Part of the unique ID for each OperatorSpec in the graph. - * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}. - */ - private int nextOpNum = 0; - private final Set<String> operatorIds = new HashSet<>(); - private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde()); - private ContextManager contextManager = null; - - public StreamGraphImpl(ApplicationRunner runner, Config config) { - // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphImpl once Systems - // can use streamId to send and receive messages. - this.runner = runner; - this.config = config; - } - - @Override - public void setDefaultSerde(Serde<?> serde) { - Preconditions.checkNotNull(serde, "Default serde must not be null"); - Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(), - "Default serde must be set before creating any input or output streams."); - this.defaultSerde = serde; - } - - @Override - public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) { - StreamSpec streamSpec = runner.getStreamSpec(streamId); - Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId); - Preconditions.checkNotNull(serde, "serde must not be null for an input stream."); - Preconditions.checkState(!inputOperators.containsKey(streamSpec), - "getInputStream must not be called multiple times with the same streamId: " + streamId); - - KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - if (outputStreams.containsKey(streamSpec)) { - OutputStreamImpl outputStream = outputStreams.get(streamSpec); - Serde keySerde = outputStream.getKeySerde(); - Serde valueSerde = outputStream.getValueSerde(); - Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), - String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " - + "stream level, so the same key and message Serde must be used for both.", streamId)); - } - - boolean isKeyed = serde instanceof KVSerde; - InputOperatorSpec inputOperatorSpec = - OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), - isKeyed, this.getNextOpId(OpCode.INPUT, null)); - inputOperators.put(streamSpec, inputOperatorSpec); - return new MessageStreamImpl<>(this, inputOperators.get(streamSpec)); - } - - @Override - public <M> MessageStream<M> getInputStream(String streamId) { - return (MessageStream<M>) getInputStream(streamId, defaultSerde); - } - - @Override - public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) { - StreamSpec streamSpec = runner.getStreamSpec(streamId); - Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId); - Preconditions.checkNotNull(serde, "serde must not be null for an output stream."); - Preconditions.checkState(!outputStreams.containsKey(streamSpec), - "getOutputStream must not be called multiple times with the same streamId: " + streamId); - - KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - if (inputOperators.containsKey(streamSpec)) { - InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec); - Serde keySerde = inputOperatorSpec.getKeySerde(); - Serde valueSerde = inputOperatorSpec.getValueSerde(); - Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), - String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " - + "stream level, so the same key and message Serde must be used for both.", streamId)); - } - - boolean isKeyed = serde instanceof KVSerde; - outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); - return outputStreams.get(streamSpec); - } - - @Override - public <M> OutputStream<M> getOutputStream(String streamId) { - return (OutputStream<M>) getOutputStream(streamId, defaultSerde); - } - - @Override - public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) { - TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec(); - if (tables.containsKey(tableSpec)) { - throw new IllegalStateException(String.format( - "getTable() invoked multiple times with the same tableId: %s", - tableDesc.getTableId())); - } - tables.put(tableSpec, new TableImpl(tableSpec)); - return tables.get(tableSpec); - } - - @Override - public StreamGraph withContextManager(ContextManager contextManager) { - this.contextManager = contextManager; - return this; - } - - /** - * See {@link StreamGraphImpl#getIntermediateStream(String, Serde, boolean)}. - */ - <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) { - return getIntermediateStream(streamId, serde, false); - } - - /** - * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph. - * An intermediate {@link MessageStream} is both an output and an input stream. - * - * @param streamId the id of the stream to be created. - * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde - * is used. - * @param isBroadcast whether the stream is a broadcast stream. - * @param <M> the type of messages in the intermediate {@link MessageStream} - * @return the intermediate {@link MessageStreamImpl} - * - * TODO: once SAMZA-1566 is resolved, we should be able to pass in the StreamSpec directly. - */ - <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) { - StreamSpec streamSpec = runner.getStreamSpec(streamId); - if (isBroadcast) { - streamSpec = streamSpec.copyWithBroadCast(); - } - - Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec), - "getIntermediateStream must not be called multiple times with the same streamId: " + streamId); - - if (serde == null) { - LOGGER.info("Using default serde for intermediate stream: " + streamId); - serde = (Serde<M>) defaultSerde; - } - - boolean isKeyed = serde instanceof KVSerde; - KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); - InputOperatorSpec inputOperatorSpec = - OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), - isKeyed, this.getNextOpId(OpCode.INPUT, null)); - inputOperators.put(streamSpec, inputOperatorSpec); - outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); - return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec)); - } - - public Map<StreamSpec, InputOperatorSpec> getInputOperators() { - return Collections.unmodifiableMap(inputOperators); - } - - public Map<StreamSpec, OutputStreamImpl> getOutputStreams() { - return Collections.unmodifiableMap(outputStreams); - } - - public Map<TableSpec, TableImpl> getTables() { - return Collections.unmodifiableMap(tables); - } - - public ContextManager getContextManager() { - return this.contextManager; - } - - /** - * Gets the unique ID for the next operator in the graph. The ID is of the following format: - * jobName-jobId-opCode-(userDefinedId|nextOpNum); - * - * @param opCode the {@link OpCode} of the next operator - * @param userDefinedId the optional user-provided name of the next operator or null - * @return the unique ID for the next operator in the graph - */ - /* package private */ String getNextOpId(OpCode opCode, String userDefinedId) { - if (StringUtils.isNotBlank(userDefinedId) && !USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) { - throw new SamzaException("Operator ID must not contain spaces and special characters: " + userDefinedId); - } - - String nextOpId = String.format("%s-%s-%s-%s", - config.get(JobConfig.JOB_NAME()), - config.get(JobConfig.JOB_ID(), "1"), - opCode.name().toLowerCase(), - StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum)); - if (!operatorIds.add(nextOpId)) { - throw new SamzaException( - String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId)); - } - nextOpNum++; - return nextOpId; - } - - /** - * Gets the unique ID for the next operator in the graph. The ID is of the following format: - * jobName-jobId-opCode-nextOpNum; - * - * @param opCode the {@link OpCode} of the next operator - * @return the unique ID for the next operator in the graph - */ - /* package private */ String getNextOpId(OpCode opCode) { - return getNextOpId(opCode, null); - } - - /** - * Get all {@link OperatorSpec}s available in this {@link StreamGraphImpl} - * - * @return all available {@link OperatorSpec}s - */ - public Collection<OperatorSpec> getAllOperatorSpecs() { - Collection<InputOperatorSpec> inputOperatorSpecs = inputOperators.values(); - Set<OperatorSpec> operatorSpecs = new HashSet<>(); - for (InputOperatorSpec inputOperatorSpec: inputOperatorSpecs) { - operatorSpecs.add(inputOperatorSpec); - doGetOperatorSpecs(inputOperatorSpec, operatorSpecs); - } - return operatorSpecs; - } - - private void doGetOperatorSpecs(OperatorSpec operatorSpec, Set<OperatorSpec> specs) { - Collection<OperatorSpec> registeredOperatorSpecs = operatorSpec.getRegisteredOperatorSpecs(); - for (OperatorSpec registeredOperatorSpec: registeredOperatorSpecs) { - specs.add(registeredOperatorSpec); - doGetOperatorSpecs(registeredOperatorSpec, specs); - } - } - - /** - * Returns <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator - * - * @return <tt>true</tt> iff this {@link StreamGraphImpl} contains a join or a window operator - */ - public boolean hasWindowOrJoins() { - // Obtain the operator specs from the streamGraph - Collection<OperatorSpec> operatorSpecs = getAllOperatorSpecs(); - - Set<OperatorSpec> windowOrJoinSpecs = operatorSpecs.stream() - .filter(spec -> spec.getOpCode() == OperatorSpec.OpCode.WINDOW || spec.getOpCode() == OperatorSpec.OpCode.JOIN) - .collect(Collectors.toSet()); - - return windowOrJoinSpecs.size() != 0; - } - - private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) { - Serde keySerde, valueSerde; - - if (serde instanceof KVSerde) { - keySerde = ((KVSerde) serde).getKeySerde(); - valueSerde = ((KVSerde) serde).getValueSerde(); - } else { - keySerde = new NoOpSerde(); - valueSerde = serde; - } - - if (keySerde instanceof NoOpSerde) { - LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId + - ". Keys will not be (de)serialized"); - } - if (valueSerde instanceof NoOpSerde) { - LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId + - ". Values will not be (de)serialized"); - } - - return KV.of(keySerde, valueSerde); - } -}
http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java new file mode 100644 index 0000000..ea9690b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/StreamGraphSpec.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators; + +import java.util.Collections; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.Map; +import java.util.Set; +import java.util.regex.Pattern; + +import org.apache.commons.lang3.StringUtils; +import org.apache.samza.SamzaException; +import org.apache.samza.config.Config; +import org.apache.samza.config.JobConfig; +import org.apache.samza.operators.spec.InputOperatorSpec; +import org.apache.samza.operators.spec.OperatorSpec.OpCode; +import org.apache.samza.operators.spec.OperatorSpecs; +import org.apache.samza.operators.spec.OutputStreamImpl; +import org.apache.samza.operators.stream.IntermediateMessageStreamImpl; +import org.apache.samza.runtime.ApplicationRunner; +import org.apache.samza.serializers.KVSerde; +import org.apache.samza.serializers.NoOpSerde; +import org.apache.samza.serializers.Serde; +import org.apache.samza.system.StreamSpec; +import org.apache.samza.table.Table; +import org.apache.samza.table.TableSpec; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * This class defines: + * 1) an implementation of {@link StreamGraph} that provides APIs for accessing {@link MessageStream}s to be used to + * create the DAG of transforms. + * 2) a builder that creates a serializable {@link OperatorSpecGraph} from user-defined DAG + */ +public class StreamGraphSpec implements StreamGraph { + private static final Logger LOGGER = LoggerFactory.getLogger(StreamGraphSpec.class); + private static final Pattern USER_DEFINED_ID_PATTERN = Pattern.compile("[\\d\\w-_.]+"); + + // We use a LHM for deterministic order in initializing and closing operators. + private final Map<StreamSpec, InputOperatorSpec> inputOperators = new LinkedHashMap<>(); + private final Map<StreamSpec, OutputStreamImpl> outputStreams = new LinkedHashMap<>(); + private final Map<TableSpec, TableImpl> tables = new LinkedHashMap<>(); + private final ApplicationRunner runner; + private final Config config; + + /** + * The 0-based position of the next operator in the graph. + * Part of the unique ID for each OperatorSpec in the graph. + * Should only accessed and incremented via {@link #getNextOpId(OpCode, String)}. + */ + private int nextOpNum = 0; + private final Set<String> operatorIds = new HashSet<>(); + private Serde<?> defaultSerde = new KVSerde(new NoOpSerde(), new NoOpSerde()); + private ContextManager contextManager = null; + + public StreamGraphSpec(ApplicationRunner runner, Config config) { + // TODO: SAMZA-1118 - Move StreamSpec and ApplicationRunner out of StreamGraphSpec once Systems + // can use streamId to send and receive messages. + this.runner = runner; + this.config = config; + } + + @Override + public void setDefaultSerde(Serde<?> serde) { + Preconditions.checkNotNull(serde, "Default serde must not be null"); + Preconditions.checkState(inputOperators.isEmpty() && outputStreams.isEmpty(), + "Default serde must be set before creating any input or output streams."); + this.defaultSerde = serde; + } + + @Override + public <M> MessageStream<M> getInputStream(String streamId, Serde<M> serde) { + StreamSpec streamSpec = runner.getStreamSpec(streamId); + Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId); + Preconditions.checkNotNull(serde, "serde must not be null for an input stream."); + Preconditions.checkState(!inputOperators.containsKey(streamSpec), + "getInputStream must not be called multiple times with the same streamId: " + streamId); + + KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + if (outputStreams.containsKey(streamSpec)) { + OutputStreamImpl outputStream = outputStreams.get(streamSpec); + Serde keySerde = outputStream.getKeySerde(); + Serde valueSerde = outputStream.getValueSerde(); + Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), + String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " + + "stream level, so the same key and message Serde must be used for both.", streamId)); + } + + boolean isKeyed = serde instanceof KVSerde; + InputOperatorSpec inputOperatorSpec = + OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), + isKeyed, this.getNextOpId(OpCode.INPUT, null)); + inputOperators.put(streamSpec, inputOperatorSpec); + return new MessageStreamImpl<>(this, inputOperators.get(streamSpec)); + } + + @Override + public <M> MessageStream<M> getInputStream(String streamId) { + return (MessageStream<M>) getInputStream(streamId, defaultSerde); + } + + @Override + public <M> OutputStream<M> getOutputStream(String streamId, Serde<M> serde) { + StreamSpec streamSpec = runner.getStreamSpec(streamId); + Preconditions.checkState(streamSpec != null, "No StreamSpec found for streamId: " + streamId); + Preconditions.checkNotNull(serde, "serde must not be null for an output stream."); + Preconditions.checkState(!outputStreams.containsKey(streamSpec), + "getOutputStream must not be called multiple times with the same streamId: " + streamId); + + KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + if (inputOperators.containsKey(streamSpec)) { + InputOperatorSpec inputOperatorSpec = inputOperators.get(streamSpec); + Serde keySerde = inputOperatorSpec.getKeySerde(); + Serde valueSerde = inputOperatorSpec.getValueSerde(); + Preconditions.checkState(kvSerdes.getKey().equals(keySerde) && kvSerdes.getValue().equals(valueSerde), + String.format("Stream %s is being used both as an input and an output stream. Serde in Samza happens at " + + "stream level, so the same key and message Serde must be used for both.", streamId)); + } + + boolean isKeyed = serde instanceof KVSerde; + outputStreams.put(streamSpec, new OutputStreamImpl<>(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); + return outputStreams.get(streamSpec); + } + + @Override + public <M> OutputStream<M> getOutputStream(String streamId) { + return (OutputStream<M>) getOutputStream(streamId, defaultSerde); + } + + @Override + public <K, V> Table<KV<K, V>> getTable(TableDescriptor<K, V, ?> tableDesc) { + TableSpec tableSpec = ((BaseTableDescriptor) tableDesc).getTableSpec(); + if (tables.containsKey(tableSpec)) { + throw new IllegalStateException(String.format( + "getTable() invoked multiple times with the same tableId: %s", + tableDesc.getTableId())); + } + tables.put(tableSpec, new TableImpl(tableSpec)); + return tables.get(tableSpec); + } + + @Override + public StreamGraph withContextManager(ContextManager contextManager) { + this.contextManager = contextManager; + return this; + } + + public ContextManager getContextManager() { + return this.contextManager; + } + + public OperatorSpecGraph getOperatorSpecGraph() { + return new OperatorSpecGraph(this); + } + + /** + * Gets the unique ID for the next operator in the graph. The ID is of the following format: + * jobName-jobId-opCode-(userDefinedId|nextOpNum); + * + * @param opCode the {@link OpCode} of the next operator + * @param userDefinedId the optional user-provided name of the next operator or null + * @return the unique ID for the next operator in the graph + */ + public String getNextOpId(OpCode opCode, String userDefinedId) { + if (StringUtils.isNotBlank(userDefinedId) && !USER_DEFINED_ID_PATTERN.matcher(userDefinedId).matches()) { + throw new SamzaException("Operator ID must not contain spaces and special characters: " + userDefinedId); + } + + String nextOpId = String.format("%s-%s-%s-%s", + config.get(JobConfig.JOB_NAME()), + config.get(JobConfig.JOB_ID(), "1"), + opCode.name().toLowerCase(), + StringUtils.isNotBlank(userDefinedId) ? userDefinedId.trim() : String.valueOf(nextOpNum)); + if (!operatorIds.add(nextOpId)) { + throw new SamzaException( + String.format("Found duplicate operator ID %s in the graph. Operator IDs must be unique.", nextOpId)); + } + nextOpNum++; + return nextOpId; + } + + /** + * Gets the unique ID for the next operator in the graph. The ID is of the following format: + * jobName-jobId-opCode-nextOpNum; + * + * @param opCode the {@link OpCode} of the next operator + * @return the unique ID for the next operator in the graph + */ + public String getNextOpId(OpCode opCode) { + return getNextOpId(opCode, null); + } + + /** + * See {@link StreamGraphSpec#getIntermediateStream(String, Serde, boolean)}. + * + * @param <M> type of messages in the intermediate stream + * @param streamId the id of the stream to be created + * @param serde the {@link Serde} to use for messages in the intermediate stream. If null, the default serde is used. + * @return the intermediate {@link MessageStreamImpl} + */ + @VisibleForTesting + public <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde) { + return getIntermediateStream(streamId, serde, false); + } + + /** + * Internal helper for {@link MessageStreamImpl} to add an intermediate {@link MessageStream} to the graph. + * An intermediate {@link MessageStream} is both an output and an input stream. + * + * @param streamId the id of the stream to be created. + * @param serde the {@link Serde} to use for the message in the intermediate stream. If null, the default serde + * is used. + * @param isBroadcast whether the stream is a broadcast stream. + * @param <M> the type of messages in the intermediate {@link MessageStream} + * @return the intermediate {@link MessageStreamImpl} + * + * TODO: once SAMZA-1566 is resolved, we should be able to pass in the StreamSpec directly. + */ + @VisibleForTesting + <M> IntermediateMessageStreamImpl<M> getIntermediateStream(String streamId, Serde<M> serde, boolean isBroadcast) { + StreamSpec streamSpec = runner.getStreamSpec(streamId); + if (isBroadcast) { + streamSpec = streamSpec.copyWithBroadCast(); + } + + Preconditions.checkState(!inputOperators.containsKey(streamSpec) && !outputStreams.containsKey(streamSpec), + "getIntermediateStream must not be called multiple times with the same streamId: " + streamId); + + if (serde == null) { + LOGGER.info("Using default serde for intermediate stream: " + streamId); + serde = (Serde<M>) defaultSerde; + } + + boolean isKeyed = serde instanceof KVSerde; + KV<Serde, Serde> kvSerdes = getKVSerdes(streamId, serde); + InputOperatorSpec inputOperatorSpec = + OperatorSpecs.createInputOperatorSpec(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), + isKeyed, this.getNextOpId(OpCode.INPUT, null)); + inputOperators.put(streamSpec, inputOperatorSpec); + outputStreams.put(streamSpec, new OutputStreamImpl(streamSpec, kvSerdes.getKey(), kvSerdes.getValue(), isKeyed)); + return new IntermediateMessageStreamImpl<>(this, inputOperators.get(streamSpec), outputStreams.get(streamSpec)); + } + + Map<StreamSpec, InputOperatorSpec> getInputOperators() { + return Collections.unmodifiableMap(inputOperators); + } + + Map<StreamSpec, OutputStreamImpl> getOutputStreams() { + return Collections.unmodifiableMap(outputStreams); + } + + Map<TableSpec, TableImpl> getTables() { + return Collections.unmodifiableMap(tables); + } + + private KV<Serde, Serde> getKVSerdes(String streamId, Serde serde) { + Serde keySerde, valueSerde; + + if (serde instanceof KVSerde) { + keySerde = ((KVSerde) serde).getKeySerde(); + valueSerde = ((KVSerde) serde).getValueSerde(); + } else { + keySerde = new NoOpSerde(); + valueSerde = serde; + } + + if (keySerde instanceof NoOpSerde) { + LOGGER.info("Using NoOpSerde as the key serde for stream " + streamId + + ". Keys will not be (de)serialized"); + } + if (valueSerde instanceof NoOpSerde) { + LOGGER.info("Using NoOpSerde as the value serde for stream " + streamId + + ". Values will not be (de)serialized"); + } + + return KV.of(keySerde, valueSerde); + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java index e671534..8ceada0 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/TableImpl.java @@ -18,6 +18,7 @@ */ package org.apache.samza.operators; +import java.io.Serializable; import org.apache.samza.table.Table; import org.apache.samza.table.TableSpec; @@ -25,7 +26,7 @@ import org.apache.samza.table.TableSpec; /** * This class is the holder of a {@link TableSpec} */ -public class TableImpl implements Table { +public class TableImpl implements Table, Serializable { private final TableSpec tableSpec; http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java index 269e7bc..8df670e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/BroadcastOperatorImpl.java @@ -42,7 +42,7 @@ class BroadcastOperatorImpl<M> extends OperatorImpl<M, Void> { BroadcastOperatorImpl(BroadcastOperatorSpec<M> broadcastOpSpec, TaskContext context) { this.broadcastOpSpec = broadcastOpSpec; - this.systemStream = broadcastOpSpec.getOutputStream().getStreamSpec().toSystemStream(); + this.systemStream = broadcastOpSpec.getOutputStream().getSystemStream(); this.taskName = context.getTaskName().getTaskName(); } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java index 608b2be..f0c0997 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java @@ -196,7 +196,7 @@ public abstract class OperatorImpl<M, RM> { results.forEach(rm -> this.registeredOperators.forEach(op -> - op.onMessage(rm, collector, coordinator))); + op.onMessage(rm, collector, coordinator))); WatermarkFunction watermarkFn = getOperatorSpec().getWatermarkFn(); if (watermarkFn != null) { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java index bbc8783..0f51798 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OperatorImplGraph.java @@ -25,7 +25,6 @@ import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.job.model.JobModel; import org.apache.samza.operators.KV; -import org.apache.samza.operators.StreamGraphImpl; import org.apache.samza.operators.TimerRegistry; import org.apache.samza.operators.functions.JoinFunction; import org.apache.samza.operators.functions.PartialJoinFunction; @@ -34,6 +33,7 @@ import org.apache.samza.operators.spec.BroadcastOperatorSpec; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.JoinOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; +import org.apache.samza.operators.OperatorSpecGraph; import org.apache.samza.operators.spec.OutputOperatorSpec; import org.apache.samza.operators.spec.PartitionByOperatorSpec; import org.apache.samza.operators.spec.SendToTableOperatorSpec; @@ -81,26 +81,26 @@ public class OperatorImplGraph { * the two {@link PartialJoinOperatorImpl}s for a {@link JoinOperatorSpec} with each other since they're * reached from different {@link OperatorSpec} during DAG traversals. */ - private final Map<String, KV<PartialJoinFunction, PartialJoinFunction>> joinFunctions = new HashMap<>(); + private final Map<String, KV<PartialJoinOperatorImpl, PartialJoinOperatorImpl>> joinOpImpls = new HashMap<>(); private final Clock clock; /** * Constructs the DAG of {@link OperatorImpl}s corresponding to the the DAG of {@link OperatorSpec}s - * in the {@code streamGraph}. + * in the {@code specGraph}. * - * @param streamGraph the {@link StreamGraphImpl} containing the logical {@link OperatorSpec} DAG + * @param specGraph the {@link OperatorSpecGraph} containing the logical {@link OperatorSpec} DAG * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators * @param clock the {@link Clock} to get current time */ - public OperatorImplGraph(StreamGraphImpl streamGraph, Config config, TaskContext context, Clock clock) { + public OperatorImplGraph(OperatorSpecGraph specGraph, Config config, TaskContext context, Clock clock) { this.clock = clock; TaskContextImpl taskContext = (TaskContextImpl) context; - Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(streamGraph) ? + Map<SystemStream, Integer> producerTaskCounts = hasIntermediateStreams(specGraph) ? getProducerTaskCountForIntermediateStreams(getStreamToConsumerTasks(taskContext.getJobModel()), - getIntermediateToInputStreamsMap(streamGraph)) : + getIntermediateToInputStreamsMap(specGraph)) : Collections.EMPTY_MAP; producerTaskCounts.forEach((stream, count) -> { LOG.info("{} has {} producer tasks.", stream, count); @@ -113,7 +113,7 @@ public class OperatorImplGraph { taskContext.registerObject(WatermarkStates.class.getName(), new WatermarkStates(context.getSystemStreamPartitions(), producerTaskCounts)); - streamGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> { + specGraph.getInputOperators().forEach((streamSpec, inputOpSpec) -> { SystemStream systemStream = new SystemStream(streamSpec.getSystemName(), streamSpec.getPhysicalName()); InputOperatorImpl inputOperatorImpl = (InputOperatorImpl) createAndRegisterOperatorImpl(null, inputOpSpec, systemStream, config, context); @@ -151,12 +151,13 @@ public class OperatorImplGraph { * creates the corresponding DAG of {@link OperatorImpl}s, and returns the root {@link OperatorImpl} node. * * @param prevOperatorSpec the parent of the current {@code operatorSpec} in the traversal - * @param operatorSpec the operatorSpec to create the {@link OperatorImpl} for + * @param operatorSpec the {@link OperatorSpec} to create the {@link OperatorImpl} for + * @param inputStream the source input stream that we traverse the {@link OperatorSpecGraph} from * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators * @return the operator implementation for the operatorSpec */ - OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, + private OperatorImpl createAndRegisterOperatorImpl(OperatorSpec prevOperatorSpec, OperatorSpec operatorSpec, SystemStream inputStream, Config config, TaskContext context) { if (!operatorImpls.containsKey(operatorSpec.getOpId()) || operatorSpec instanceof JoinOperatorSpec) { @@ -178,7 +179,9 @@ public class OperatorImplGraph { Collection<OperatorSpec> registeredSpecs = operatorSpec.getRegisteredOperatorSpecs(); registeredSpecs.forEach(registeredSpec -> { - OperatorImpl nextImpl = createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context); + LOG.debug("Creating operator {} with opCode: {}", registeredSpec.getOpId(), registeredSpec.getOpCode()); + OperatorImpl nextImpl = + createAndRegisterOperatorImpl(operatorSpec, registeredSpec, inputStream, config, context); operatorImpl.registerNextOperator(nextImpl); }); return operatorImpl; @@ -199,7 +202,8 @@ public class OperatorImplGraph { /** * Creates a new {@link OperatorImpl} instance for the provided {@link OperatorSpec}. * - * @param operatorSpec the immutable {@link OperatorSpec} definition. + * @param prevOperatorSpec the original {@link OperatorSpec} that produces output for {@code operatorSpec} from {@link OperatorSpecGraph} + * @param operatorSpec the original {@link OperatorSpec} from {@link OperatorSpecGraph} * @param config the {@link Config} required to instantiate operators * @param context the {@link TaskContext} required to instantiate operators * @return the {@link OperatorImpl} implementation instance @@ -209,17 +213,19 @@ public class OperatorImplGraph { if (operatorSpec instanceof InputOperatorSpec) { return new InputOperatorImpl((InputOperatorSpec) operatorSpec); } else if (operatorSpec instanceof StreamOperatorSpec) { - return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec, config, context); + return new StreamOperatorImpl((StreamOperatorSpec) operatorSpec); } else if (operatorSpec instanceof SinkOperatorSpec) { return new SinkOperatorImpl((SinkOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof OutputOperatorSpec) { - return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec, config, context); + return new OutputOperatorImpl((OutputOperatorSpec) operatorSpec); } else if (operatorSpec instanceof PartitionByOperatorSpec) { return new PartitionByOperatorImpl((PartitionByOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof WindowOperatorSpec) { return new WindowOperatorImpl((WindowOperatorSpec) operatorSpec, clock); } else if (operatorSpec instanceof JoinOperatorSpec) { - return createPartialJoinOperatorImpl(prevOperatorSpec, (JoinOperatorSpec) operatorSpec, config, context, clock); + return getOrCreatePartialJoinOpImpls((JoinOperatorSpec) operatorSpec, + prevOperatorSpec.equals(((JoinOperatorSpec) operatorSpec).getLeftInputOpSpec()), + config, context, clock); } else if (operatorSpec instanceof StreamTableJoinOperatorSpec) { return new StreamTableJoinOperatorImpl((StreamTableJoinOperatorSpec) operatorSpec, config, context); } else if (operatorSpec instanceof SendToTableOperatorSpec) { @@ -231,23 +237,24 @@ public class OperatorImplGraph { String.format("Unsupported OperatorSpec: %s", operatorSpec.getClass().getName())); } - private PartialJoinOperatorImpl createPartialJoinOperatorImpl(OperatorSpec prevOperatorSpec, - JoinOperatorSpec joinOpSpec, Config config, TaskContext context, Clock clock) { - KV<PartialJoinFunction, PartialJoinFunction> partialJoinFunctions = getOrCreatePartialJoinFunctions(joinOpSpec); - if (joinOpSpec.getLeftInputOpSpec().equals(prevOperatorSpec)) { // we got here from the left side of the join - return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ true, - partialJoinFunctions.getKey(), partialJoinFunctions.getValue(), config, context, clock); + private PartialJoinOperatorImpl getOrCreatePartialJoinOpImpls(JoinOperatorSpec joinOpSpec, boolean isLeft, + Config config, TaskContext context, Clock clock) { + // get the per task pair of PartialJoinOperatorImpl for the corresponding {@code joinOpSpec} + KV<PartialJoinOperatorImpl, PartialJoinOperatorImpl> partialJoinOpImpls = joinOpImpls.computeIfAbsent(joinOpSpec.getOpId(), + joinOpId -> { + PartialJoinFunction leftJoinFn = createLeftJoinFn(joinOpSpec); + PartialJoinFunction rightJoinFn = createRightJoinFn(joinOpSpec); + return new KV(new PartialJoinOperatorImpl(joinOpSpec, true, leftJoinFn, rightJoinFn, config, context, clock), + new PartialJoinOperatorImpl(joinOpSpec, false, rightJoinFn, leftJoinFn, config, context, clock)); + }); + + if (isLeft) { // we got here from the left side of the join + return partialJoinOpImpls.getKey(); } else { // we got here from the right side of the join - return new PartialJoinOperatorImpl(joinOpSpec, /* isLeftSide */ false, - partialJoinFunctions.getValue(), partialJoinFunctions.getKey(), config, context, clock); + return partialJoinOpImpls.getValue(); } } - private KV<PartialJoinFunction, PartialJoinFunction> getOrCreatePartialJoinFunctions(JoinOperatorSpec joinOpSpec) { - return joinFunctions.computeIfAbsent(joinOpSpec.getOpId(), - joinOpId -> KV.of(createLeftJoinFn(joinOpSpec), createRightJoinFn(joinOpSpec))); - } - private PartialJoinFunction<Object, Object, Object, Object> createLeftJoinFn(JoinOperatorSpec joinOpSpec) { return new PartialJoinFunction<Object, Object, Object, Object>() { private final JoinFunction joinFn = joinOpSpec.getJoinFn(); @@ -316,8 +323,8 @@ public class OperatorImplGraph { }; } - private boolean hasIntermediateStreams(StreamGraphImpl streamGraph) { - return !Collections.disjoint(streamGraph.getInputOperators().keySet(), streamGraph.getOutputStreams().keySet()); + private boolean hasIntermediateStreams(OperatorSpecGraph specGraph) { + return !Collections.disjoint(specGraph.getInputOperators().keySet(), specGraph.getOutputStreams().keySet()); } /** @@ -358,12 +365,12 @@ public class OperatorImplGraph { /** * calculate the mapping from output streams to input streams - * @param streamGraph the user {@link StreamGraphImpl} instance + * @param specGraph the user {@link OperatorSpecGraph} * @return mapping from output streams to input streams */ - static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(StreamGraphImpl streamGraph) { + static Multimap<SystemStream, SystemStream> getIntermediateToInputStreamsMap(OperatorSpecGraph specGraph) { Multimap<SystemStream, SystemStream> outputToInputStreams = HashMultimap.create(); - streamGraph.getInputOperators().entrySet().stream() + specGraph.getInputOperators().entrySet().stream() .forEach( entry -> computeOutputToInput(entry.getKey().toSystemStream(), entry.getValue(), outputToInputStreams)); return outputToInputStreams; http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java index 27bef87..e625484 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/OutputOperatorImpl.java @@ -42,11 +42,10 @@ class OutputOperatorImpl<M> extends OperatorImpl<M, Void> { private final OutputStreamImpl<M> outputStream; private final SystemStream systemStream; - OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec, Config config, TaskContext context) { + OutputOperatorImpl(OutputOperatorSpec<M> outputOpSpec) { this.outputOpSpec = outputOpSpec; this.outputStream = outputOpSpec.getOutputStream(); - this.systemStream = new SystemStream(outputStream.getStreamSpec().getSystemName(), - outputStream.getStreamSpec().getPhysicalName()); + this.systemStream = outputStream.getSystemStream(); } @Override http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java index 9fc1e7c..dd64429 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/PartitionByOperatorImpl.java @@ -21,6 +21,7 @@ package org.apache.samza.operators.impl; import org.apache.samza.config.Config; import org.apache.samza.container.TaskContextImpl; import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; import org.apache.samza.operators.spec.PartitionByOperatorSpec; @@ -36,7 +37,6 @@ import org.apache.samza.task.TaskCoordinator; import java.util.Collection; import java.util.Collections; -import java.util.function.Function; /** @@ -46,17 +46,15 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { private final PartitionByOperatorSpec<M, K, V> partitionByOpSpec; private final SystemStream systemStream; - private final Function<? super M, ? extends K> keyFunction; - private final Function<? super M, ? extends V> valueFunction; + private final MapFunction<? super M, ? extends K> keyFunction; + private final MapFunction<? super M, ? extends V> valueFunction; private final String taskName; private final ControlMessageSender controlMessageSender; PartitionByOperatorImpl(PartitionByOperatorSpec<M, K, V> partitionByOpSpec, Config config, TaskContext context) { this.partitionByOpSpec = partitionByOpSpec; OutputStreamImpl<KV<K, V>> outputStream = partitionByOpSpec.getOutputStream(); - this.systemStream = new SystemStream( - outputStream.getStreamSpec().getSystemName(), - outputStream.getStreamSpec().getPhysicalName()); + this.systemStream = outputStream.getSystemStream(); this.keyFunction = partitionByOpSpec.getKeyFunction(); this.valueFunction = partitionByOpSpec.getValueFunction(); this.taskName = context.getTaskName().getTaskName(); @@ -66,6 +64,8 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { @Override protected void handleInit(Config config, TaskContext context) { + this.keyFunction.init(config, context); + this.valueFunction.init(config, context); } @Override @@ -80,6 +80,8 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { @Override protected void handleClose() { + this.keyFunction.close(); + this.valueFunction.close(); } @Override @@ -100,7 +102,7 @@ class PartitionByOperatorImpl<M, K, V> extends OperatorImpl<M, Void> { } private void sendControlMessage(ControlMessage message, MessageCollector collector) { - SystemStream outputStream = partitionByOpSpec.getOutputStream().getStreamSpec().toSystemStream(); + SystemStream outputStream = partitionByOpSpec.getOutputStream().getSystemStream(); controlMessageSender.send(message, outputStream, collector); } } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java index a51d5e6..6cd426b 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/StreamOperatorImpl.java @@ -40,8 +40,7 @@ class StreamOperatorImpl<M, RM> extends OperatorImpl<M, RM> { private final StreamOperatorSpec<M, RM> streamOpSpec; private final FlatMapFunction<M, RM> transformFn; - StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec, - Config config, TaskContext context) { + StreamOperatorImpl(StreamOperatorSpec<M, RM> streamOpSpec) { this.streamOpSpec = streamOpSpec; this.transformFn = streamOpSpec.getTransformFn(); } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java index 32406cb..6b5baae 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/impl/WindowOperatorImpl.java @@ -23,6 +23,8 @@ package org.apache.samza.operators.impl; import com.google.common.base.Preconditions; import org.apache.samza.config.Config; import org.apache.samza.operators.functions.FoldLeftFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.SupplierFunction; import org.apache.samza.operators.impl.store.TimeSeriesKey; import org.apache.samza.operators.impl.store.TimeSeriesStore; import org.apache.samza.operators.impl.store.TimeSeriesStoreImpl; @@ -58,8 +60,6 @@ import java.util.List; import java.util.Map; import java.util.Optional; import java.util.Set; -import java.util.function.Function; -import java.util.function.Supplier; import java.util.stream.Collectors; /** @@ -93,8 +93,8 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje private final Clock clock; private final WindowInternal<M, K, Object> window; private final FoldLeftFunction<M, Object> foldLeftFn; - private final Supplier<Object> initializer; - private final Function<M, K> keyFn; + private final SupplierFunction<Object> initializer; + private final MapFunction<M, K> keyFn; private final TriggerScheduler<K> triggerScheduler; private final Map<TriggerKey<K>, TriggerImplHandler> triggers = new HashMap<>(); @@ -112,11 +112,18 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje @Override protected void handleInit(Config config, TaskContext context) { - WindowInternal<M, K, Object> window = windowOpSpec.getWindow(); KeyValueStore<TimeSeriesKey<K>, Object> store = (KeyValueStore<TimeSeriesKey<K>, Object>) context.getStore(windowOpSpec.getOpId()); + if (initializer != null) { + initializer.init(config, context); + } + + if (keyFn != null) { + keyFn.init(config, context); + } + // For aggregating windows, we use the store in over-write mode since we only retain the aggregated // value. Else, we use the store in append-mode. if (foldLeftFn != null) { @@ -215,6 +222,12 @@ public class WindowOperatorImpl<M, K> extends OperatorImpl<M, WindowPane<K, Obje if (timeSeriesStore != null) { timeSeriesStore.close(); } + if (initializer != null) { + initializer.close(); + } + if (keyFn != null) { + keyFn.close(); + } } private TriggerImplHandler getOrCreateTriggerImplHandler(TriggerKey<K> triggerKey, Trigger<M> trigger) { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java new file mode 100644 index 0000000..a5cdb82 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FilterOperatorSpec.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import java.util.ArrayList; +import java.util.Collection; +import org.apache.samza.config.Config; +import org.apache.samza.operators.functions.FilterFunction; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.task.TaskContext; + + +/** + * The spec for an operator that filters input messages based on some conditions. + * + * @param <M> type of input message + */ +class FilterOperatorSpec<M> extends StreamOperatorSpec<M, M> { + private final FilterFunction<M> filterFn; + + FilterOperatorSpec(FilterFunction<M> filterFn, String opId) { + super(new FlatMapFunction<M, M>() { + @Override + public Collection<M> apply(M message) { + return new ArrayList<M>() { + { + if (filterFn.apply(message)) { + this.add(message); + } + } + }; + } + + @Override + public void init(Config config, TaskContext context) { + filterFn.init(config, context); + } + + @Override + public void close() { + filterFn.close(); + } + }, OpCode.FILTER, opId); + this.filterFn = filterFn; + } + + @Override + public WatermarkFunction getWatermarkFn() { + return this.filterFn instanceof WatermarkFunction ? (WatermarkFunction) this.filterFn : null; + } + + @Override + public TimerFunction getTimerFn() { + return this.filterFn instanceof TimerFunction ? (TimerFunction) this.filterFn : null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java new file mode 100644 index 0000000..a93a221 --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/FlatMapOperatorSpec.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.WatermarkFunction; + + +/** + * The spec for an operator that transforms each input message to a collection of output messages. + * + * @param <M> type of input message + * @param <OM> type of output messages + */ +class FlatMapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> { + + FlatMapOperatorSpec(FlatMapFunction<M, OM> flatMapFn, String opId) { + super(flatMapFn, OpCode.FLAT_MAP, opId); + } + + @Override + public WatermarkFunction getWatermarkFn() { + return this.transformFn instanceof WatermarkFunction ? (WatermarkFunction) this.transformFn : null; + } + + @Override + public TimerFunction getTimerFn() { + return this.transformFn instanceof TimerFunction ? (TimerFunction) this.transformFn : null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java index 2ed1e30..a636ac5 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/InputOperatorSpec.java @@ -20,8 +20,8 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.TimerFunction; -import org.apache.samza.serializers.Serde; import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; /** @@ -33,10 +33,15 @@ import org.apache.samza.system.StreamSpec; */ public class InputOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Object> { // Object == KV<K, V> | V - private final StreamSpec streamSpec; - private final Serde<K> keySerde; - private final Serde<V> valueSerde; private final boolean isKeyed; + private final StreamSpec streamSpec; + + /** + * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the configs for a stream, and deserialized + * once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis + */ + private transient final Serde<K> keySerde; + private transient final Serde<V> valueSerde; public InputOperatorSpec(StreamSpec streamSpec, Serde<K> keySerde, Serde<V> valueSerde, boolean isKeyed, String opId) { http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java index 9e058ff..a218135 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/JoinOperatorSpec.java @@ -42,14 +42,20 @@ import java.util.Map; */ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> implements StatefulOperatorSpec { // Object == M | OM - private final OperatorSpec<?, M> leftInputOpSpec; - private final OperatorSpec<?, OM> rightInputOpSpec; private final JoinFunction<K, M, OM, JM> joinFn; - private final Serde<K> keySerde; - private final Serde<TimestampedValue<M>> messageSerde; - private final Serde<TimestampedValue<OM>> otherMessageSerde; private final long ttlMs; + private final OperatorSpec<?, M> leftInputOpSpec; + private final OperatorSpec<?, OM> rightInputOpSpec; + + /** + * The following {@link Serde}s are serialized by the ExecutionPlanner when generating the store configs for a join, and + * deserialized once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis + */ + private transient final Serde<K> keySerde; + private transient final Serde<TimestampedValue<M>> messageSerde; + private transient final Serde<TimestampedValue<OM>> otherMessageSerde; + /** * Default constructor for a {@link JoinOperatorSpec}. * @@ -126,4 +132,5 @@ public class JoinOperatorSpec<K, M, OM, JM> extends OperatorSpec<Object, JM> imp public long getTtlMs() { return ttlMs; } + } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java new file mode 100644 index 0000000..1e2190b --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MapOperatorSpec.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import java.util.ArrayList; +import java.util.Collection; +import org.apache.samza.config.Config; +import org.apache.samza.operators.functions.FlatMapFunction; +import org.apache.samza.operators.functions.MapFunction; +import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.WatermarkFunction; +import org.apache.samza.task.TaskContext; + + +/** + * The spec for an operator that transforms each input message to a single output message. + * + * @param <M> type of input message + * @param <OM> type of output messages + */ +class MapOperatorSpec<M, OM> extends StreamOperatorSpec<M, OM> { + + private final MapFunction<M, OM> mapFn; + + MapOperatorSpec(MapFunction<M, OM> mapFn, String opId) { + super(new FlatMapFunction<M, OM>() { + @Override + public Collection<OM> apply(M message) { + return new ArrayList<OM>() { + { + OM r = mapFn.apply(message); + if (r != null) { + this.add(r); + } + } + }; + } + + @Override + public void init(Config config, TaskContext context) { + mapFn.init(config, context); + } + + @Override + public void close() { + mapFn.close(); + } + }, OpCode.MAP, opId); + this.mapFn = mapFn; + } + + @Override + public WatermarkFunction getWatermarkFn() { + return this.mapFn instanceof WatermarkFunction ? (WatermarkFunction) this.mapFn : null; + } + + @Override + public TimerFunction getTimerFn() { + return this.mapFn instanceof TimerFunction ? (TimerFunction) this.mapFn : null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java new file mode 100644 index 0000000..987f72c --- /dev/null +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/MergeOperatorSpec.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.samza.operators.spec; + +import java.util.ArrayList; +import org.apache.samza.operators.functions.TimerFunction; +import org.apache.samza.operators.functions.WatermarkFunction; + + +/** + * The spec for an operator that combines messages from all input streams into a single output stream. + * + * @param <M> the type of messages in all input streams + */ +class MergeOperatorSpec<M> extends StreamOperatorSpec<M, M> { + + MergeOperatorSpec(String opId) { + super((M message) -> + new ArrayList<M>() { + { + this.add(message); + } + }, OperatorSpec.OpCode.MERGE, opId); + } + + @Override + public WatermarkFunction getWatermarkFn() { + return null; + } + + @Override + public TimerFunction getTimerFn() { + return null; + } +} http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java index 7b0a41b..e1e1c55 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpec.java @@ -18,9 +18,9 @@ */ package org.apache.samza.operators.spec; +import java.io.Serializable; import java.util.Collection; import java.util.LinkedHashSet; -import java.util.Set; import org.apache.samza.annotation.InterfaceStability; import org.apache.samza.operators.MessageStream; @@ -30,14 +30,14 @@ import org.apache.samza.operators.functions.WatermarkFunction; /** * A stream operator specification that holds all the information required to transform - * the input {@link org.apache.samza.operators.MessageStreamImpl} and produce the output - * {@link org.apache.samza.operators.MessageStreamImpl}. + * the input {@link MessageStreamImpl} and produce the output + * {@link MessageStreamImpl}. * * @param <M> the type of input message to the operator * @param <OM> the type of output message from the operator */ @InterfaceStability.Unstable -public abstract class OperatorSpec<M, OM> { +public abstract class OperatorSpec<M, OM> implements Serializable { public enum OpCode { INPUT, @@ -61,9 +61,15 @@ public abstract class OperatorSpec<M, OM> { /** * The set of operators that consume the messages produced from this operator. * <p> - * We use a LinkedHashSet since we need deterministic ordering in initializing/closing operators. + * We use a LinkedHashSet since we need both deterministic ordering in initializing/closing operators and serializability. */ - private final Set<OperatorSpec<OM, ?>> nextOperatorSpecs = new LinkedHashSet<>(); + private final LinkedHashSet<OperatorSpec<OM, ?>> nextOperatorSpecs = new LinkedHashSet<>(); + + // this method is used in unit tests to verify an {@link OperatorSpec} instance is a deserialized copy of this object. + final boolean isClone(OperatorSpec other) { + return this != other && this.getClass().isAssignableFrom(other.getClass()) + && this.opCode.equals(other.opCode) && this.opId.equals(other.opId); + } public OperatorSpec(OpCode opCode, String opId) { this.opCode = opCode; @@ -79,6 +85,11 @@ public abstract class OperatorSpec<M, OM> { nextOperatorSpecs.add(nextOperatorSpec); } + /** + * Get the collection of chained {@link OperatorSpec}s that are consuming the output of this node + * + * @return the collection of chained {@link OperatorSpec}s + */ public Collection<OperatorSpec<OM, ?>> getRegisteredOperatorSpecs() { return nextOperatorSpecs; } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java index c38f6e8..6e98d5a 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OperatorSpecs.java @@ -19,11 +19,6 @@ package org.apache.samza.operators.spec; -import java.util.ArrayList; -import java.util.Collection; -import java.util.function.Function; - -import org.apache.samza.config.Config; import org.apache.samza.operators.KV; import org.apache.samza.operators.functions.FilterFunction; import org.apache.samza.operators.functions.FlatMapFunction; @@ -35,7 +30,6 @@ import org.apache.samza.operators.windows.internal.WindowInternal; import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; import org.apache.samza.table.TableSpec; -import org.apache.samza.task.TaskContext; /** @@ -73,29 +67,7 @@ public class OperatorSpecs { */ public static <M, OM> StreamOperatorSpec<M, OM> createMapOperatorSpec( MapFunction<? super M, ? extends OM> mapFn, String opId) { - return new StreamOperatorSpec<>(new FlatMapFunction<M, OM>() { - @Override - public Collection<OM> apply(M message) { - return new ArrayList<OM>() { - { - OM r = mapFn.apply(message); - if (r != null) { - this.add(r); - } - } - }; - } - - @Override - public void init(Config config, TaskContext context) { - mapFn.init(config, context); - } - - @Override - public void close() { - mapFn.close(); - } - }, mapFn, OperatorSpec.OpCode.MAP, opId); + return new MapOperatorSpec<>((MapFunction<M, OM>) mapFn, opId); } /** @@ -108,28 +80,7 @@ public class OperatorSpecs { */ public static <M> StreamOperatorSpec<M, M> createFilterOperatorSpec( FilterFunction<? super M> filterFn, String opId) { - return new StreamOperatorSpec<>(new FlatMapFunction<M, M>() { - @Override - public Collection<M> apply(M message) { - return new ArrayList<M>() { - { - if (filterFn.apply(message)) { - this.add(message); - } - } - }; - } - - @Override - public void init(Config config, TaskContext context) { - filterFn.init(config, context); - } - - @Override - public void close() { - filterFn.close(); - } - }, filterFn, OperatorSpec.OpCode.FILTER, opId); + return new FilterOperatorSpec<>((FilterFunction<M>) filterFn, opId); } /** @@ -143,7 +94,7 @@ public class OperatorSpecs { */ public static <M, OM> StreamOperatorSpec<M, OM> createFlatMapOperatorSpec( FlatMapFunction<? super M, ? extends OM> flatMapFn, String opId) { - return new StreamOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, flatMapFn, OperatorSpec.OpCode.FLAT_MAP, opId); + return new FlatMapOperatorSpec<>((FlatMapFunction<M, OM>) flatMapFn, opId); } /** @@ -183,8 +134,8 @@ public class OperatorSpecs { * @return the {@link OutputOperatorSpec} for the partitionBy operator */ public static <M, K, V> PartitionByOperatorSpec<M, K, V> createPartitionByOperatorSpec( - OutputStreamImpl<KV<K, V>> outputStream, Function<? super M, ? extends K> keyFunction, - Function<? super M, ? extends V> valueFunction, String opId) { + OutputStreamImpl<KV<K, V>> outputStream, MapFunction<? super M, ? extends K> keyFunction, + MapFunction<? super M, ? extends V> valueFunction, String opId) { return new PartitionByOperatorSpec<>(outputStream, keyFunction, valueFunction, opId); } @@ -198,7 +149,6 @@ public class OperatorSpecs { * @param <WV> the type of value in the window * @return the {@link WindowOperatorSpec} */ - public static <M, WK, WV> WindowOperatorSpec<M, WK, WV> createWindowOperatorSpec( WindowInternal<M, WK, WV> window, String opId) { return new WindowOperatorSpec<>(window, opId); @@ -236,13 +186,7 @@ public class OperatorSpecs { * @return the {@link StreamOperatorSpec} for the merge */ public static <M> StreamOperatorSpec<M, M> createMergeOperatorSpec(String opId) { - return new StreamOperatorSpec<>(message -> - new ArrayList<M>() { - { - this.add(message); - } - }, - null, OperatorSpec.OpCode.MERGE, opId); + return new MergeOperatorSpec<>(opId); } /** @@ -266,7 +210,6 @@ public class OperatorSpecs { * Creates a {@link SendToTableOperatorSpec} with a key extractor and a value extractor function, * the type of incoming message is expected to be KV<K, V>. * - * @param inputOpSpec the operator spec for the input stream * @param tableSpec the table spec for the underlying table * @param opId the unique ID of the operator * @param <K> the type of the table record key @@ -274,8 +217,8 @@ public class OperatorSpecs { * @return the {@link SendToTableOperatorSpec} */ public static <K, V> SendToTableOperatorSpec<K, V> createSendToTableOperatorSpec( - OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec tableSpec, String opId) { - return new SendToTableOperatorSpec(inputOpSpec, tableSpec, opId); + TableSpec tableSpec, String opId) { + return new SendToTableOperatorSpec(tableSpec, opId); } /** http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java index e439c4e..fe0abcb 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/OutputStreamImpl.java @@ -18,18 +18,25 @@ */ package org.apache.samza.operators.spec; +import java.io.Serializable; import org.apache.samza.operators.OutputStream; import org.apache.samza.serializers.Serde; import org.apache.samza.system.StreamSpec; +import org.apache.samza.system.SystemStream; -public class OutputStreamImpl<M> implements OutputStream<M> { +public class OutputStreamImpl<M> implements OutputStream<M>, Serializable { private final StreamSpec streamSpec; - private final Serde keySerde; - private final Serde valueSerde; private final boolean isKeyed; + /** + * The following fields are serialized by the ExecutionPlanner when generating the configs for the output stream, and + * deserialized once during startup in SamzaContainer. They don't need to be deserialized here on a per-task basis + */ + private transient final Serde keySerde; + private transient final Serde valueSerde; + public OutputStreamImpl(StreamSpec streamSpec, Serde keySerde, Serde valueSerde, boolean isKeyed) { this.streamSpec = streamSpec; @@ -50,6 +57,10 @@ public class OutputStreamImpl<M> implements OutputStream<M> { return valueSerde; } + public SystemStream getSystemStream() { + return this.streamSpec.toSystemStream(); + } + public boolean isKeyed() { return isKeyed; } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java index a0a9b61..d6bf3d9 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/PartitionByOperatorSpec.java @@ -19,10 +19,11 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.KV; +import org.apache.samza.operators.functions.MapFunction; import org.apache.samza.operators.functions.TimerFunction; import org.apache.samza.operators.functions.WatermarkFunction; -import java.util.function.Function; +import static com.google.common.base.Preconditions.checkArgument; /** @@ -39,21 +40,25 @@ import java.util.function.Function; public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> { private final OutputStreamImpl<KV<K, V>> outputStream; - private final Function<? super M, ? extends K> keyFunction; - private final Function<? super M, ? extends V> valueFunction; + private final MapFunction<? super M, ? extends K> keyFunction; + private final MapFunction<? super M, ? extends V> valueFunction; /** * Constructs an {@link PartitionByOperatorSpec} to send messages to the provided {@code outputStream} * * @param outputStream the {@link OutputStreamImpl} to send messages to - * @param keyFunction the {@link Function} for extracting the key from the message - * @param valueFunction the {@link Function} for extracting the value from the message + * @param keyFunction the {@link MapFunction} for extracting the key from the message + * @param valueFunction the {@link MapFunction} for extracting the value from the message * @param opId the unique ID of this {@link SinkOperatorSpec} in the graph */ PartitionByOperatorSpec(OutputStreamImpl<KV<K, V>> outputStream, - Function<? super M, ? extends K> keyFunction, - Function<? super M, ? extends V> valueFunction, String opId) { + MapFunction<? super M, ? extends K> keyFunction, + MapFunction<? super M, ? extends V> valueFunction, String opId) { super(OpCode.PARTITION_BY, opId); + checkArgument(!(keyFunction instanceof TimerFunction || keyFunction instanceof WatermarkFunction), + "keyFunction for partitionBy should not implement TimerFunction or WatermarkFunction."); + checkArgument(!(valueFunction instanceof TimerFunction || valueFunction instanceof WatermarkFunction), + "valueFunction for partitionBy should not implement TimerFunction or WatermarkFunction."); this.outputStream = outputStream; this.keyFunction = keyFunction; this.valueFunction = valueFunction; @@ -67,11 +72,11 @@ public class PartitionByOperatorSpec<M, K, V> extends OperatorSpec<M, Void> { return this.outputStream; } - public Function<? super M, ? extends K> getKeyFunction() { + public MapFunction<? super M, ? extends K> getKeyFunction() { return keyFunction; } - public Function<? super M, ? extends V> getValueFunction() { + public MapFunction<? super M, ? extends V> getValueFunction() { return valueFunction; } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java index e1b51be..22f393e 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/SendToTableOperatorSpec.java @@ -35,26 +35,19 @@ import org.apache.samza.table.TableSpec; @InterfaceStability.Unstable public class SendToTableOperatorSpec<K, V> extends OperatorSpec<KV<K, V>, Void> { - private final OperatorSpec<?, KV<K, V>> inputOpSpec; private final TableSpec tableSpec; /** * Constructor for a {@link SendToTableOperatorSpec}. * - * @param inputOpSpec the operator spec of the input stream * @param tableSpec the table spec of the table written to * @param opId the unique ID for this operator */ - SendToTableOperatorSpec(OperatorSpec<?, KV<K, V>> inputOpSpec, TableSpec tableSpec, String opId) { + SendToTableOperatorSpec(TableSpec tableSpec, String opId) { super(OpCode.SEND_TO, opId); - this.inputOpSpec = inputOpSpec; this.tableSpec = tableSpec; } - public OperatorSpec<?, KV<K, V>> getInputOpSpec() { - return inputOpSpec; - } - public TableSpec getTableSpec() { return tableSpec; } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java index 644eb6c..3addbf7 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/StreamOperatorSpec.java @@ -19,46 +19,31 @@ package org.apache.samza.operators.spec; import org.apache.samza.operators.functions.FlatMapFunction; -import org.apache.samza.operators.functions.TimerFunction; -import org.apache.samza.operators.functions.WatermarkFunction; - /** - * The spec for a simple stream operator that outputs 0 or more messages for each input message. + * The common spec for a simple stream operator that outputs 0 or more messages for each input message. * * @param <M> the type of input message * @param <OM> the type of output message */ -public class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> { +public abstract class StreamOperatorSpec<M, OM> extends OperatorSpec<M, OM> { - private final FlatMapFunction<M, OM> transformFn; - private final Object originalFn; + protected final FlatMapFunction<M, OM> transformFn; /** * Constructor for a {@link StreamOperatorSpec}. * * @param transformFn the transformation function - * @param originalFn the original user function before wrapping to transformFn * @param opCode the {@link OpCode} for this {@link StreamOperatorSpec} * @param opId the unique ID for this {@link StreamOperatorSpec} */ - StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, Object originalFn, OperatorSpec.OpCode opCode, String opId) { + protected StreamOperatorSpec(FlatMapFunction<M, OM> transformFn, OperatorSpec.OpCode opCode, String opId) { super(opCode, opId); this.transformFn = transformFn; - this.originalFn = originalFn; } public FlatMapFunction<M, OM> getTransformFn() { return this.transformFn; } - @Override - public WatermarkFunction getWatermarkFn() { - return originalFn instanceof WatermarkFunction ? (WatermarkFunction) originalFn : null; - } - - @Override - public TimerFunction getTimerFn() { - return originalFn instanceof TimerFunction ? (TimerFunction) originalFn : null; - } } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java index 73d10ff..8d1ad29 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java +++ b/samza-core/src/main/java/org/apache/samza/operators/spec/WindowOperatorSpec.java @@ -40,6 +40,8 @@ import java.util.Collections; import java.util.List; import java.util.stream.Collectors; +import static com.google.common.base.Preconditions.*; + /** * The spec for an operator that groups messages into finite windows for processing @@ -61,6 +63,15 @@ public class WindowOperatorSpec<M, WK, WV> extends OperatorSpec<M, WindowPane<WK */ WindowOperatorSpec(WindowInternal<M, WK, WV> window, String opId) { super(OpCode.WINDOW, opId); + checkArgument(window.getInitializer() == null || + !(window.getInitializer() instanceof TimerFunction || window.getInitializer() instanceof WatermarkFunction), + "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the initializer."); + checkArgument(window.getKeyExtractor() == null || + !(window.getKeyExtractor() instanceof TimerFunction || window.getKeyExtractor() instanceof WatermarkFunction), + "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the keyExtractor."); + checkArgument(window.getEventTimeExtractor() == null || + !(window.getEventTimeExtractor() instanceof TimerFunction || window.getEventTimeExtractor() instanceof WatermarkFunction), + "A window does not accepts a user-defined TimerFunction or WatermarkFunction as the eventTimeExtractor."); this.window = window; } http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java index 5eeca99..272ba63 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java +++ b/samza-core/src/main/java/org/apache/samza/operators/stream/IntermediateMessageStreamImpl.java @@ -20,7 +20,7 @@ package org.apache.samza.operators.stream; import org.apache.samza.operators.MessageStreamImpl; import org.apache.samza.operators.OutputStream; -import org.apache.samza.operators.StreamGraphImpl; +import org.apache.samza.operators.StreamGraphSpec; import org.apache.samza.operators.spec.InputOperatorSpec; import org.apache.samza.operators.spec.OperatorSpec; import org.apache.samza.operators.spec.OutputStreamImpl; @@ -45,7 +45,7 @@ public class IntermediateMessageStreamImpl<M> extends MessageStreamImpl<M> imple private final OutputStreamImpl<M> outputStream; private final boolean isKeyed; - public IntermediateMessageStreamImpl(StreamGraphImpl graph, InputOperatorSpec<?, M> inputOperatorSpec, + public IntermediateMessageStreamImpl(StreamGraphSpec graph, InputOperatorSpec<?, M> inputOperatorSpec, OutputStreamImpl<M> outputStream) { super(graph, (OperatorSpec<?, M>) inputOperatorSpec); this.outputStream = outputStream; http://git-wip-us.apache.org/repos/asf/samza/blob/53d7f262/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java index ca0ba67..96defd5 100644 --- a/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java +++ b/samza-core/src/main/java/org/apache/samza/operators/triggers/Cancellable.java @@ -30,5 +30,5 @@ public interface Cancellable { * * @return the result of the cancelation */ - public boolean cancel(); + boolean cancel(); }
