rkhachatryan commented on a change in pull request #11403: [FLINK-16316][operators] Implement new StreamOperatorBase as a replacement for AbstractStreamOperator URL: https://github.com/apache/flink/pull/11403#discussion_r397059586
########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractStreamOperatorV2.java ########## @@ -0,0 +1,488 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.VisibleForTesting; +import org.apache.flink.api.common.ExecutionConfig; +import org.apache.flink.api.common.state.KeyedStateStore; +import org.apache.flink.api.common.state.State; +import org.apache.flink.api.common.state.StateDescriptor; +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.functions.KeySelector; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.MetricOptions; +import org.apache.flink.core.fs.CloseableRegistry; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.checkpoint.CheckpointOptions; +import org.apache.flink.runtime.execution.Environment; +import org.apache.flink.runtime.jobgraph.OperatorID; +import org.apache.flink.runtime.metrics.groups.OperatorMetricGroup; +import org.apache.flink.runtime.metrics.groups.TaskManagerJobMetricGroup; +import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups; +import org.apache.flink.runtime.state.CheckpointStreamFactory; +import org.apache.flink.runtime.state.KeyedStateBackend; +import org.apache.flink.runtime.state.OperatorStateBackend; +import org.apache.flink.runtime.state.StateInitializationContext; +import org.apache.flink.runtime.state.StateSnapshotContext; +import org.apache.flink.runtime.state.VoidNamespace; +import org.apache.flink.runtime.state.VoidNamespaceSerializer; +import org.apache.flink.streaming.api.graph.StreamConfig; +import org.apache.flink.streaming.api.watermark.Watermark; +import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; +import org.apache.flink.streaming.runtime.tasks.StreamTask; +import org.apache.flink.streaming.util.LatencyStats; +import org.apache.flink.util.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Arrays; +import java.util.Locale; +import java.util.Optional; + +/** + * New base class for all stream operators, intended to eventually replace {@link AbstractStreamOperator}. + * Currently intended to work smoothly just with {@link MultipleInputStreamOperator}. + * + * <p>One note-able difference in comparison to {@link AbstractStreamOperator} is lack of + * {@link AbstractStreamOperator#setup(StreamTask, StreamConfig, Output)} in favor of initialisation + * in the constructor, and removed some tight coupling with classes like {@link StreamTask}. + * + * <p>Methods are guaranteed not to be called concurrently. + * + * @param <OUT> The output type of the operator + */ +@Experimental +public abstract class AbstractStreamOperatorV2<OUT> implements StreamOperator<OUT> { + /** The logger used by the operator class and its subclasses. */ + protected static final Logger LOG = LoggerFactory.getLogger(AbstractStreamOperatorV2.class); + + protected final StreamConfig config; + protected final Output<StreamRecord<OUT>> output; + private final StreamingRuntimeContext runtimeContext; + private final ExecutionConfig executionConfig; + private final ClassLoader userCodeClassLoader; + private final CloseableRegistry cancelables; + private final long[] inputWatermarks; + + /** Metric group for the operator. */ + protected final OperatorMetricGroup metrics; + protected final LatencyStats latencyStats; + protected final ProcessingTimeService processingTimeService; + + private StreamOperatorStateHandler stateHandler; + + // We keep track of watermarks from both inputs, the combined input is the minimum + // Once the minimum advances we emit a new watermark for downstream operators + private long combinedWatermark = Long.MIN_VALUE; + + public AbstractStreamOperatorV2(StreamOperatorParameters<OUT> parameters, int numberOfInputs) { + inputWatermarks = new long[numberOfInputs]; + Arrays.fill(inputWatermarks, Long.MIN_VALUE); + final Environment environment = parameters.getContainingTask().getEnvironment(); + config = parameters.getStreamConfig(); + CountingOutput<OUT> countingOutput; + OperatorMetricGroup operatorMetricGroup; + try { + operatorMetricGroup = environment.getMetricGroup().getOrAddOperator(config.getOperatorID(), config.getOperatorName()); + countingOutput = new CountingOutput(parameters.getOutput(), operatorMetricGroup.getIOMetricGroup().getNumRecordsOutCounter()); + if (config.isChainStart()) { + operatorMetricGroup.getIOMetricGroup().reuseInputMetricsForTask(); + } + if (config.isChainEnd()) { + operatorMetricGroup.getIOMetricGroup().reuseOutputMetricsForTask(); + } + } catch (Exception e) { + LOG.warn("An error occurred while instantiating task metrics.", e); + countingOutput = null; + operatorMetricGroup = null; + } + + if (countingOutput == null || operatorMetricGroup == null) { + metrics = UnregisteredMetricGroups.createUnregisteredOperatorMetricGroup(); + output = parameters.getOutput(); + } + else { + metrics = operatorMetricGroup; + output = countingOutput; + } + + latencyStats = createLatencyStats( + environment.getTaskManagerInfo().getConfiguration(), + parameters.getContainingTask().getIndexInSubtaskGroup()); + + processingTimeService = Preconditions.checkNotNull(parameters.getProcessingTimeService()); + executionConfig = parameters.getContainingTask().getExecutionConfig(); + userCodeClassLoader = parameters.getContainingTask().getUserCodeClassLoader(); + cancelables = parameters.getContainingTask().getCancelables(); + + runtimeContext = new StreamingRuntimeContext( + environment, + environment.getAccumulatorRegistry().getUserMap(), + operatorMetricGroup, + getOperatorID(), + processingTimeService, + null); + } + + private LatencyStats createLatencyStats(Configuration taskManagerConfig, int indexInSubtaskGroup) { + try { + int historySize = taskManagerConfig.getInteger(MetricOptions.LATENCY_HISTORY_SIZE); + if (historySize <= 0) { + LOG.warn("{} has been set to a value equal or below 0: {}. Using default.", MetricOptions.LATENCY_HISTORY_SIZE, historySize); + historySize = MetricOptions.LATENCY_HISTORY_SIZE.defaultValue(); + } + + final String configuredGranularity = taskManagerConfig.getString(MetricOptions.LATENCY_SOURCE_GRANULARITY); + LatencyStats.Granularity granularity; + try { + granularity = LatencyStats.Granularity.valueOf(configuredGranularity.toUpperCase(Locale.ROOT)); + } catch (IllegalArgumentException iae) { + granularity = LatencyStats.Granularity.OPERATOR; + LOG.warn( + "Configured value {} option for {} is invalid. Defaulting to {}.", + configuredGranularity, + MetricOptions.LATENCY_SOURCE_GRANULARITY.key(), + granularity); + } + TaskManagerJobMetricGroup jobMetricGroup = this.metrics.parent().parent(); + return new LatencyStats(jobMetricGroup.addGroup("latency"), + historySize, + indexInSubtaskGroup, + getOperatorID(), + granularity); + } catch (Exception e) { + LOG.warn("An error occurred while instantiating latency metrics.", e); + return new LatencyStats( + UnregisteredMetricGroups.createUnregisteredTaskManagerJobMetricGroup().addGroup("latency"), + 1, + 0, + new OperatorID(), + LatencyStats.Granularity.SINGLE); + } + } + + @Override + public MetricGroup getMetricGroup() { + return metrics; + } + + @Override + public final void initializeState(StreamTaskStateInitializer streamTaskStateManager) throws Exception { + final TypeSerializer<?> keySerializer = config.getStateKeySerializer(getUserCodeClassloader()); + + final StreamOperatorStateContext context = + streamTaskStateManager.streamOperatorStateContext( + getOperatorID(), + getClass().getSimpleName(), + getProcessingTimeService(), + this, + keySerializer, + cancelables, + metrics); + + stateHandler = new StreamOperatorStateHandler(context, getExecutionConfig(), cancelables); + stateHandler.initializeOperatorState(this::initializeState); + } + + /** + * This method is called immediately before any elements are processed, it should contain the + * operator's initialization logic, e.g. state initialization. + * + * <p>The default implementation does nothing. + * + * @throws Exception An exception in this method causes the operator to fail. + */ + @Override + public void open() throws Exception {} + + /** + * This method is called after all records have been added to the operators via the methods + * {@link OneInputStreamOperator#processElement(StreamRecord)}, or + * {@link TwoInputStreamOperator#processElement1(StreamRecord)} and + * {@link TwoInputStreamOperator#processElement2(StreamRecord)}. + * + * <p>The method is expected to flush all remaining buffered data. Exceptions during this flushing + * of buffered should be propagated, in order to cause the operation to be recognized asa failed, + * because the last data items are not processed properly. + * + * @throws Exception An exception in this method causes the operator to fail. + */ + @Override + public void close() throws Exception {} + + /** + * This method is called at the very end of the operator's life, both in the case of a successful + * completion of the operation, and in the case of a failure and canceling. + * + * <p>This method is expected to make a thorough effort to release all resources + * that the operator has acquired. + */ + @Override + public void dispose() throws Exception { + if (stateHandler != null) { + stateHandler.dispose(); + } + } + + @Override + public void prepareSnapshotPreBarrier(long checkpointId) throws Exception { + // the default implementation does nothing and accepts the checkpoint + // this is purely for subclasses to override + } + + @Override + public final OperatorSnapshotFutures snapshotState(long checkpointId, long timestamp, CheckpointOptions checkpointOptions, + CheckpointStreamFactory factory) throws Exception { + return stateHandler.snapshotState( + this::snapshotState, + getOperatorName(), + checkpointId, + timestamp, + checkpointOptions, + factory); + } + + /** + * Stream operators with state, which want to participate in a snapshot need to override this hook method. + * + * @param context context that provides information and means required for taking a snapshot + */ + public void snapshotState(StateSnapshotContext context) throws Exception { + } + + /** + * Stream operators with state which can be restored need to override this hook method. + * + * @param context context that allows to register different states. + */ + public void initializeState(StateInitializationContext context) throws Exception { + + } + + @Override + public void notifyCheckpointComplete(long checkpointId) throws Exception { + stateHandler.notifyCheckpointComplete(checkpointId); + } + + // ------------------------------------------------------------------------ + // Properties and Services + // ------------------------------------------------------------------------ + + /** + * Gets the execution config defined on the execution environment of the job to which this + * operator belongs. + * + * @return The job's execution config. + */ + public ExecutionConfig getExecutionConfig() { + return executionConfig; + } + + public StreamConfig getOperatorConfig() { + return config; + } + + public ClassLoader getUserCodeClassloader() { + return userCodeClassLoader; + } + + /** + * Return the operator name. If the runtime context has been set, then the task name with + * subtask index is returned. Otherwise, the simple class name is returned. + * + * @return If runtime context is set, then return task name with subtask index. Otherwise return + * simple class name. + */ + protected String getOperatorName() { + if (runtimeContext != null) { + return runtimeContext.getTaskNameWithSubtasks(); + } else { + return getClass().getSimpleName(); + } + } + + /** + * Returns a context that allows the operator to query information about the execution and also + * to interact with systems such as broadcast variables and managed state. This also allows + * to register timers. + */ + public StreamingRuntimeContext getRuntimeContext() { + return runtimeContext; + } + + @SuppressWarnings("unchecked") + public <K> KeyedStateBackend<K> getKeyedStateBackend() { + return (KeyedStateBackend<K>) stateHandler.getKeyedStateBackend(); + } + + public OperatorStateBackend getOperatorStateBackend() { + return stateHandler.getOperatorStateBackend(); + } + + /** + * Returns the {@link ProcessingTimeService} responsible for getting the current + * processing time and registering timers. + */ + public ProcessingTimeService getProcessingTimeService() { + return processingTimeService; + } Review comment: I'd rather go with `protected` but I don't think this issue is very important. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services