StephanEwen commented on a change in pull request #12306: URL: https://github.com/apache/flink/pull/12306#discussion_r430354715
########## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java ########## @@ -0,0 +1,85 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.eventtime.Watermark; + +/** + * The interface provided by Flink task to the {@link SourceReader} to emit records + * to downstream operators for message processing. + */ +@PublicEvolving +public interface ReaderOutput<T> extends SourceOutput<T> { + + /** + * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)}; + * + * @param record the record to emit. + */ + @Override + void collect(T record); Review comment: I often do that to spare the user navigating to parent classes to see methods and read JavaDocs. But it is purely cosmetic. I don't feel strongly about this. ########## File path: flink-core/src/main/java/org/apache/flink/api/connector/source/ReaderOutput.java ########## @@ -0,0 +1,85 @@ +/* + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +package org.apache.flink.api.connector.source; + +import org.apache.flink.annotation.PublicEvolving; +import org.apache.flink.api.common.eventtime.Watermark; + +/** + * The interface provided by Flink task to the {@link SourceReader} to emit records + * to downstream operators for message processing. + */ +@PublicEvolving +public interface ReaderOutput<T> extends SourceOutput<T> { + + /** + * Emit a record without a timestamp. Equivalent to {@link #collect(Object, long) collect(timestamp, null)}; + * + * @param record the record to emit. + */ + @Override + void collect(T record); + + /** + * Emit a record with timestamp. + * + * @param record the record to emit. + * @param timestamp the timestamp of the record. + */ + @Override + void collect(T record, long timestamp); + + /** + * Emits the given watermark. + * + * <p>Emitting a watermark also implicitly marks the stream as <i>active</i>, ending + * previously marked idleness. + */ + @Override + void emitWatermark(Watermark watermark); + + /** + * Marks this output as idle, meaning that downstream operations do not + * wait for watermarks from this output. + * + * <p>An output becomes active again as soon as the next watermark is emitted. + */ + @Override + void markIdle(); + + /** + * Creates a {@code SourceOutput} for a specific Source Split. Use these outputs if you want to + * run split-local logic, like watermark generation. + * Only one split-local output may be created per split. + * + * <p><b>IMPORTANT:</b> After the split has been finished, it is crucial to release the created + * output again. Otherwise it will continue to contribute to the watermark generation like a + * perpetually stalling source split, and may hold back the watermark indefinitely. + * + * @see #releaseOutputForSplit(String) + */ + SourceOutput<T> createOutputForSplit(String splitId); Review comment: Yes, that is better. Will change this. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.source; + +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Implementation of the SourceOutput. The records emitted to this output are pushed into a given + * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or + * into a separate {@link WatermarkOutput}, if one is provided. + * + * <h2>Perdiodic Watermarks</h2> + * + * <p>This output does not implement periodic watermarks + * <h2>Note on Performance Considerations</h2> + * + * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)} + * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible, + * we want to have only a single implementation of these two methods, across all classes. + * That way, the JIT compiler can de-virtualize (and inline) them better. + * + * <p>Currently, we have one implementation of these methods in the batch case (see class + * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM + * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed + * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method). + * + * @param <T> The type of emitted records. + */ +public class SourceOutputWithWatermarks<T> implements SourceOutput<T> { + + private final PushingAsyncDataInput.DataOutput<T> recordsOutput; + + private final TimestampAssigner<T> timestampAssigner; + + private final WatermarkGenerator<T> watermarkGenerator; + + private final WatermarkOutput onEventWatermarkOutput; + + private final WatermarkOutput periodicWatermarkOutput; + + /** + * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput + * and watermarks to the (possibly different) WatermarkOutput. + */ + protected SourceOutputWithWatermarks( + PushingAsyncDataInput.DataOutput<T> recordsOutput, + WatermarkOutput onEventWatermarkOutput, + WatermarkOutput periodicWatermarkOutput, + TimestampAssigner<T> timestampAssigner, + WatermarkGenerator<T> watermarkGenerator) { + + this.recordsOutput = checkNotNull(recordsOutput); + this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput); + this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput); + this.timestampAssigner = checkNotNull(timestampAssigner); + this.watermarkGenerator = checkNotNull(watermarkGenerator); + } + + // ------------------------------------------------------------------------ + // SourceOutput Methods + // + // Note that the two methods below are final, as a partial enforcement + // of the performance design goal mentioned in the class-level comment. + // ------------------------------------------------------------------------ + + @Override + public final void collect(T record) { + collect(record, Long.MIN_VALUE); Review comment: Yes, it is, see comments in https://github.com/apache/flink/blob/master/flink-core/src/main/java/org/apache/flink/api/common/eventtime/TimestampAssigner.java Will replace this with a symbol to not have this "magic value" there. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/SourceOutputWithWatermarks.java ########## @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.source; + +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.Watermark; +import org.apache.flink.api.common.eventtime.WatermarkGenerator; +import org.apache.flink.api.common.eventtime.WatermarkOutput; +import org.apache.flink.api.connector.source.SourceOutput; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.ExceptionInChainedOperatorException; + +import static org.apache.flink.util.Preconditions.checkNotNull; + +/** + * Implementation of the SourceOutput. The records emitted to this output are pushed into a given + * {@link PushingAsyncDataInput.DataOutput}. The watermarks are pushed into the same output, or + * into a separate {@link WatermarkOutput}, if one is provided. + * + * <h2>Perdiodic Watermarks</h2> + * + * <p>This output does not implement periodic watermarks + * <h2>Note on Performance Considerations</h2> + * + * <p>The methods {@link SourceOutput#collect(Object)} and {@link SourceOutput#collect(Object, long)} + * are highly performance-critical (part of the hot loop). To make the code as JIT friendly as possible, + * we want to have only a single implementation of these two methods, across all classes. + * That way, the JIT compiler can de-virtualize (and inline) them better. + * + * <p>Currently, we have one implementation of these methods in the batch case (see class + * {@link BatchTimestampsAndWatermarks}) and one for the streaming case (this class). When the JVM + * is dedicated to a single job (or type of job) only one of these classes will be loaded. In mixed + * job setups, we still have a bimorphic method (rather than a poly/-/mega-morphic method). + * + * @param <T> The type of emitted records. + */ +public class SourceOutputWithWatermarks<T> implements SourceOutput<T> { + + private final PushingAsyncDataInput.DataOutput<T> recordsOutput; + + private final TimestampAssigner<T> timestampAssigner; + + private final WatermarkGenerator<T> watermarkGenerator; + + private final WatermarkOutput onEventWatermarkOutput; + + private final WatermarkOutput periodicWatermarkOutput; + + /** + * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput + * and watermarks to the (possibly different) WatermarkOutput. + */ + protected SourceOutputWithWatermarks( + PushingAsyncDataInput.DataOutput<T> recordsOutput, + WatermarkOutput onEventWatermarkOutput, + WatermarkOutput periodicWatermarkOutput, + TimestampAssigner<T> timestampAssigner, + WatermarkGenerator<T> watermarkGenerator) { + + this.recordsOutput = checkNotNull(recordsOutput); + this.onEventWatermarkOutput = checkNotNull(onEventWatermarkOutput); + this.periodicWatermarkOutput = checkNotNull(periodicWatermarkOutput); + this.timestampAssigner = checkNotNull(timestampAssigner); + this.watermarkGenerator = checkNotNull(watermarkGenerator); + } + + // ------------------------------------------------------------------------ + // SourceOutput Methods + // + // Note that the two methods below are final, as a partial enforcement + // of the performance design goal mentioned in the class-level comment. + // ------------------------------------------------------------------------ + + @Override + public final void collect(T record) { + collect(record, Long.MIN_VALUE); + } + + @Override + public final void collect(T record, long timestamp) { + try { + final long assignedTimestamp = timestampAssigner.extractTimestamp(record, timestamp); + + // IMPORTANT: The event must be emitted before the watermark generator is called. + recordsOutput.emitRecord(new StreamRecord<>(record, assignedTimestamp)); + watermarkGenerator.onEvent(record, assignedTimestamp, onEventWatermarkOutput); + } catch (ExceptionInChainedOperatorException e) { + throw e; + } catch (Exception e) { + throw new ExceptionInChainedOperatorException(e); + } + } + + // ------------------------------------------------------------------------ + // WatermarkOutput Methods + // + // These two methods are final as well, to enforce the contract that the + // watermarks from emitWatermark(Watermark) go to the same output as the + // watermarks from the watermarkGenerator.onEvent(...) calls in the collect(...) + // methods. + // ------------------------------------------------------------------------ + + @Override + public final void emitWatermark(Watermark watermark) { + onEventWatermarkOutput.emitWatermark(watermark); + } + + @Override + public final void markIdle() { + onEventWatermarkOutput.markIdle(); + } + + public final void emitPeriodicWatermark() { + watermarkGenerator.onPeriodicEmit(periodicWatermarkOutput); + } + + // ------------------------------------------------------------------------ + // Factories + // ------------------------------------------------------------------------ + + /** + * Creates a new SourceOutputWithWatermarks that emits records to the given DataOutput + * and watermarks to the (possibly different) WatermarkOutput. Review comment: For the per-partition watermarking. The records flow directly downstream, but watermarks go to the `WatermarkOutputMultiplexer` which merges them. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/TimestampsAndWatermarks.java ########## @@ -0,0 +1,41 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.source; + +import org.apache.flink.api.connector.source.ReaderOutput; +import org.apache.flink.streaming.runtime.io.PushingAsyncDataInput; + +/** + * Basic interface for the timestamp extraction and watermark generation logic for the + * {@link org.apache.flink.api.connector.source.SourceReader}. + * + * <p>Implementations of this class may or may not actually perform certain tasks, like watermark + * generation. For example, the batch-oriented implementation typically skips all watermark generation + * logic. + * + * @param <T> The type of the emitted records. + */ +public interface TimestampsAndWatermarks<T> { Review comment: Good point, I will annotate all classes in that package with `@Internal`. ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/SourceOperatorFactory.java ########## @@ -61,9 +67,11 @@ public SourceOperatorFactory(Source<OUT, ?, ?> source, int numCoordinatorWorkerT final SourceOperator<OUT, ?> sourceOperator = instantiateSourceOperator( source::createReader, gateway, - source.getSplitSerializer()); + source.getSplitSerializer(), + eventTimeConfig); sourceOperator.setup(parameters.getContainingTask(), parameters.getStreamConfig(), parameters.getOutput()); + sourceOperator.processingTimeService = parameters.getProcessingTimeService(); Review comment: Will do ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/EventTimeConfig.java ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.source; + +import org.apache.flink.api.common.eventtime.RecordTimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; + +import java.io.Serializable; +import java.time.Duration; + +/** + * A collection of all information relevant to setting up timestamp extraction and watermark + * generation in a data source operator. + */ +public final class EventTimeConfig<T> implements Serializable { Review comment: Maybe. I think this may actually serve a "config-style" purpose (at least Configs may be more than key/value pairs). ########## File path: flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/source/EventTimeConfig.java ########## @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.streaming.api.operators.source; + +import org.apache.flink.api.common.eventtime.RecordTimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssigner; +import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier; +import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier; +import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService; + +import java.io.Serializable; +import java.time.Duration; + +/** + * A collection of all information relevant to setting up timestamp extraction and watermark + * generation in a data source operator. + */ +public final class EventTimeConfig<T> implements Serializable { Review comment: Maybe. I think this may actually serve a "config-style" purpose (at least if Configs may be more than key/value pairs). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
