becketqin commented on a change in pull request #12306:
URL: https://github.com/apache/flink/pull/12306#discussion_r429942801
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java
##########
@@ -61,9 +67,11 @@ public SourceOperatorFactory(Source<OUT, ?, ?> source, int
numCoordinatorWorkerT
final SourceOperator<OUT, ?> sourceOperator =
instantiateSourceOperator(
source::createReader,
gateway,
- source.getSplitSerializer());
+ source.getSplitSerializer(),
+ eventTimeConfig);
sourceOperator.setup(parameters.getContainingTask(),
parameters.getStreamConfig(), parameters.getOutput());
+ sourceOperator.processingTimeService =
parameters.getProcessingTimeService();
Review comment:
Might be better to put in the constructor of `SourceOperator`?
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
##########
@@ -0,0 +1,85 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit
records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+ /**
+ * Emit a record without a timestamp. Equivalent to {@link
#collect(Object, long) collect(timestamp, null)};
+ *
+ * @param record the record to emit.
+ */
+ @Override
+ void collect(T record);
Review comment:
Is there a reason to override the method in the parent class here?
##########
File path:
flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java
##########
@@ -0,0 +1,85 @@
+/*
+ Licensed to the Apache Software Foundation (ASF) under one
+ or more contributor license agreements. See the NOTICE file
+ distributed with this work for additional information
+ regarding copyright ownership. The ASF licenses this file
+ to you under the Apache License, Version 2.0 (the
+ "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+ */
+
+package org.apache.flink.api.connector.source;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.api.common.eventtime.Watermark;
+
+/**
+ * The interface provided by Flink task to the {@link SourceReader} to emit
records
+ * to downstream operators for message processing.
+ */
+@PublicEvolving
+public interface ReaderOutput<T> extends SourceOutput<T> {
+
+ /**
+ * Emit a record without a timestamp. Equivalent to {@link
#collect(Object, long) collect(timestamp, null)};
+ *
+ * @param record the record to emit.
+ */
+ @Override
+ void collect(T record);
+
+ /**
+ * Emit a record with timestamp.
+ *
+ * @param record the record to emit.
+ * @param timestamp the timestamp of the record.
+ */
+ @Override
+ void collect(T record, long timestamp);
+
+ /**
+ * Emits the given watermark.
+ *
+ * <p>Emitting a watermark also implicitly marks the stream as
<i>active</i>, ending
+ * previously marked idleness.
+ */
+ @Override
+ void emitWatermark(Watermark watermark);
+
+ /**
+ * Marks this output as idle, meaning that downstream operations do not
+ * wait for watermarks from this output.
+ *
+ * <p>An output becomes active again as soon as the next watermark is
emitted.
+ */
+ @Override
+ void markIdle();
+
+ /**
+ * Creates a {@code SourceOutput} for a specific Source Split. Use
these outputs if you want to
+ * run split-local logic, like watermark generation.
+ * Only one split-local output may be created per split.
+ *
+ * <p><b>IMPORTANT:</b> After the split has been finished, it is
crucial to release the created
+ * output again. Otherwise it will continue to contribute to the
watermark generation like a
+ * perpetually stalling source split, and may hold back the watermark
indefinitely.
+ *
+ * @see #releaseOutputForSplit(String)
+ */
+ SourceOutput<T> createOutputForSplit(String splitId);
Review comment:
Should we specify the behavior of this method to just return the
existing `SourceOutput` if one is already created for the given split?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.connector.source.ReaderOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+
+/**
+ * Basic interface for the timestamp extraction and watermark generation logic
for the
+ * {@link org.apache.flink.api.connector.source.SourceReader}.
+ *
+ * <p>Implementations of this class may or may not actually perform certain
tasks, like watermark
+ * generation. For example, the batch-oriented implementation typically skips
all watermark generation
+ * logic.
+ *
+ * @param <T> The type of the emitted records.
+ */
+public interface TimestampsAndWatermarks<T> {
Review comment:
Is this an "Internal" class? I am wondering when should the annotations
be put?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are
pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into
the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Perdiodic Watermarks</h2>
+ *
+ * <p>This output does not implement periodic watermarks
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link
SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as
JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across
all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case
(see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this
class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes
will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a
poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+ private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+ private final TimestampAssigner<T> timestampAssigner;
+
+ private final WatermarkGenerator<T> watermarkGenerator;
+
+ private final WatermarkOutput onEventWatermarkOutput;
+
+ private final WatermarkOutput periodicWatermarkOutput;
+
+ /**
+ * Creates a new SourceOutputWithWatermarks that emits records to the
given DataOutput
+ * and watermarks to the (possibly different) WatermarkOutput.
+ */
+ protected SourceOutputWithWatermarks(
+ PushingAsyncDataInput.DataOutput<T> recordsOutput,
+ WatermarkOutput onEventWatermarkOutput,
+ WatermarkOutput periodicWatermarkOutput,
+ TimestampAssigner<T> timestampAssigner,
+ WatermarkGenerator<T> watermarkGenerator) {
+
+ this.recordsOutput = checkNotNull(recordsOutput);
+ this.onEventWatermarkOutput =
checkNotNull(onEventWatermarkOutput);
+ this.periodicWatermarkOutput =
checkNotNull(periodicWatermarkOutput);
+ this.timestampAssigner = checkNotNull(timestampAssigner);
+ this.watermarkGenerator = checkNotNull(watermarkGenerator);
+ }
+
+ //
------------------------------------------------------------------------
+ // SourceOutput Methods
+ //
+ // Note that the two methods below are final, as a partial enforcement
+ // of the performance design goal mentioned in the class-level comment.
+ //
------------------------------------------------------------------------
+
+ @Override
+ public final void collect(T record) {
+ collect(record, Long.MIN_VALUE);
Review comment:
Is this a symbolic special value of "NO_TIMESTAMP" or it might trigger
retraction in the downstream operators?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java
##########
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.Watermark;
+import org.apache.flink.api.common.eventtime.WatermarkGenerator;
+import org.apache.flink.api.common.eventtime.WatermarkOutput;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import
org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * Implementation of the SourceOutput. The records emitted to this output are
pushed into a given
+ * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into
the same output, or
+ * into a separate {@link WatermarkOutput}, if one is provided.
+ *
+ * <h2>Perdiodic Watermarks</h2>
+ *
+ * <p>This output does not implement periodic watermarks
+ * <h2>Note on Performance Considerations</h2>
+ *
+ * <p>The methods {@link SourceOutput#collect(Object)} and {@link
SourceOutput#collect(Object, long)}
+ * are highly performance-critical (part of the hot loop). To make the code as
JIT friendly as possible,
+ * we want to have only a single implementation of these two methods, across
all classes.
+ * That way, the JIT compiler can de-virtualize (and inline) them better.
+ *
+ * <p>Currently, we have one implementation of these methods in the batch case
(see class
+ * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this
class). When the JVM
+ * is dedicated to a single job (or type of job) only one of these classes
will be loaded. In mixed
+ * job setups, we still have a bimorphic method (rather than a
poly/-/mega-morphic method).
+ *
+ * @param <T> The type of emitted records.
+ */
+public class SourceOutputWithWatermarks<T> implements SourceOutput<T> {
+
+ private final PushingAsyncDataInput.DataOutput<T> recordsOutput;
+
+ private final TimestampAssigner<T> timestampAssigner;
+
+ private final WatermarkGenerator<T> watermarkGenerator;
+
+ private final WatermarkOutput onEventWatermarkOutput;
+
+ private final WatermarkOutput periodicWatermarkOutput;
+
+ /**
+ * Creates a new SourceOutputWithWatermarks that emits records to the
given DataOutput
+ * and watermarks to the (possibly different) WatermarkOutput.
+ */
+ protected SourceOutputWithWatermarks(
+ PushingAsyncDataInput.DataOutput<T> recordsOutput,
+ WatermarkOutput onEventWatermarkOutput,
+ WatermarkOutput periodicWatermarkOutput,
+ TimestampAssigner<T> timestampAssigner,
+ WatermarkGenerator<T> watermarkGenerator) {
+
+ this.recordsOutput = checkNotNull(recordsOutput);
+ this.onEventWatermarkOutput =
checkNotNull(onEventWatermarkOutput);
+ this.periodicWatermarkOutput =
checkNotNull(periodicWatermarkOutput);
+ this.timestampAssigner = checkNotNull(timestampAssigner);
+ this.watermarkGenerator = checkNotNull(watermarkGenerator);
+ }
+
+ //
------------------------------------------------------------------------
+ // SourceOutput Methods
+ //
+ // Note that the two methods below are final, as a partial enforcement
+ // of the performance design goal mentioned in the class-level comment.
+ //
------------------------------------------------------------------------
+
+ @Override
+ public final void collect(T record) {
+ collect(record, Long.MIN_VALUE);
+ }
+
+ @Override
+ public final void collect(T record, long timestamp) {
+ try {
+ final long assignedTimestamp =
timestampAssigner.extractTimestamp(record, timestamp);
+
+ // IMPORTANT: The event must be emitted before the
watermark generator is called.
+ recordsOutput.emitRecord(new StreamRecord<>(record,
assignedTimestamp));
+ watermarkGenerator.onEvent(record, assignedTimestamp,
onEventWatermarkOutput);
+ } catch (ExceptionInChainedOperatorException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new ExceptionInChainedOperatorException(e);
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // WatermarkOutput Methods
+ //
+ // These two methods are final as well, to enforce the contract that the
+ // watermarks from emitWatermark(Watermark) go to the same output as the
+ // watermarks from the watermarkGenerator.onEvent(...) calls in the
collect(...)
+ // methods.
+ //
------------------------------------------------------------------------
+
+ @Override
+ public final void emitWatermark(Watermark watermark) {
+ onEventWatermarkOutput.emitWatermark(watermark);
+ }
+
+ @Override
+ public final void markIdle() {
+ onEventWatermarkOutput.markIdle();
+ }
+
+ public final void emitPeriodicWatermark() {
+ watermarkGenerator.onPeriodicEmit(periodicWatermarkOutput);
+ }
+
+ //
------------------------------------------------------------------------
+ // Factories
+ //
------------------------------------------------------------------------
+
+ /**
+ * Creates a new SourceOutputWithWatermarks that emits records to the
given DataOutput
+ * and watermarks to the (possibly different) WatermarkOutput.
Review comment:
Why is is a possibly different `WatermarkOutput`?
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/EventTimeConfig.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.operators.source;
+
+import org.apache.flink.api.common.eventtime.RecordTimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssigner;
+import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
+import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
+import org.apache.flink.metrics.MetricGroup;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+
+import java.io.Serializable;
+import java.time.Duration;
+
+/**
+ * A collection of all information relevant to setting up timestamp extraction
and watermark
+ * generation in a data source operator.
+ */
+public final class EventTimeConfig<T> implements Serializable {
Review comment:
`TimestampAndWatermarksFactory` seems a more consistent class name.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]