This is an automated email from the ASF dual-hosted git repository. guoweijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d7c87f23fa32addfbc2ae21dd9645dd29f5684b8 Author: yunfengzhou-hub <[email protected]> AuthorDate: Mon Sep 30 15:49:38 2024 +0800 [FLINK-36355][runtime] Remove deprecated TimeCharacteristic --- .../file/table/stream/StreamingFileWriterTest.java | 2 - .../flink/api/common/state/StateTtlConfig.java | 5 +- .../datastream/impl/ExecutionEnvironmentImpl.java | 7 +- .../python/datastream/data_stream_job.py | 3 +- .../ContinuousFileProcessingMigrationTest.java | 3 - .../hdfstests/ContinuousFileProcessingTest.java | 191 --------------------- .../stream_execution_environment.rst | 2 - .../docs/reference/pyflink.datastream/timer.rst | 11 -- flink-python/pyflink/datastream/__init__.py | 5 - flink-python/pyflink/datastream/functions.py | 15 +- .../datastream/stream_execution_environment.py | 35 ---- .../pyflink/datastream/tests/test_data_stream.py | 6 +- .../pyflink/datastream/time_characteristic.py | 95 ---------- .../flink/streaming/api/TimeCharacteristic.java | 98 ----------- .../environment/StreamExecutionEnvironment.java | 54 ------ .../api/functions/KeyedProcessFunction.java | 3 +- .../streaming/api/functions/ProcessFunction.java | 3 +- .../functions/co/BaseBroadcastProcessFunction.java | 3 +- .../api/functions/co/CoProcessFunction.java | 3 +- .../api/functions/co/KeyedCoProcessFunction.java | 3 +- .../source/ContinuousFileReaderOperator.java | 1 - .../flink/streaming/api/graph/StreamConfig.java | 18 -- .../flink/streaming/api/graph/StreamGraph.java | 11 -- .../streaming/api/graph/StreamGraphGenerator.java | 12 -- .../api/graph/StreamingJobGraphGenerator.java | 2 - .../streaming/api/operators/StreamSource.java | 4 - .../api/operators/StreamSourceContexts.java | 51 +----- .../flink/streaming/runtime/tasks/StreamTask.java | 5 +- .../source/ContinuousFileReaderOperatorTest.java | 3 - .../StreamSourceContextIdleDetectionTests.java | 173 ------------------- .../ContinuousFileProcessingRescalingTest.java | 2 - .../StreamSourceOperatorLatencyMetricsTest.java | 2 - .../windowing/TimeWindowTranslationTest.java | 4 - .../tasks/InterruptSensitiveRestoreTest.java | 2 - .../util/AbstractStreamOperatorTestHarness.java | 9 - .../AbstractUdfStreamOperatorLifecycleTest.java | 3 - .../StreamSourceOperatorWatermarksTest.java | 100 ----------- .../runtime/tasks/SourceStreamTaskTest.java | 7 - .../tasks/StreamTaskMailboxTestHarnessBuilder.java | 3 - .../runtime/tasks/StreamTaskSystemExitTest.java | 2 - .../streaming/runtime/tasks/StreamTaskTest.java | 5 - .../runtime/tasks/StreamTaskTestHarness.java | 2 - .../AbstractStreamTableEnvironmentImpl.java | 9 +- .../java/internal/StreamTableEnvironmentImpl.java | 8 - .../internal/StreamTableEnvironmentImpl.scala | 17 -- .../utils/DummyStreamExecutionEnvironment.java | 12 -- .../runtime/stream/sql/AggregateITCase.scala | 5 - .../planner/runtime/stream/sql/JoinITCase.scala | 5 - .../sql/TemporalTableFunctionJoinITCase.scala | 6 - .../flink/table/planner/utils/TableTestBase.scala | 13 +- .../streaming/runtime/StreamTaskTimerITCase.java | 27 --- .../test/streaming/runtime/TimestampITCase.java | 28 --- 52 files changed, 29 insertions(+), 1069 deletions(-) diff --git a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java index 935dff6f780..22cd3f6447a 100644 --- a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java +++ b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java @@ -24,7 +24,6 @@ import org.apache.flink.connector.file.table.FileSystemTableSink; import org.apache.flink.core.fs.Path; import org.apache.flink.core.io.SimpleVersionedSerializer; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner; import org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig; import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy; @@ -375,7 +374,6 @@ class StreamingFileWriterTest { conf); OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> harness = new OneInputStreamOperatorTestHarness<>(writer, 1, 1, 0); - harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime); return harness; } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java index 0ba3ea00e1f..61f0f4eccfe 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java @@ -84,10 +84,7 @@ public class StateTtlConfig implements Serializable { /** This option configures time scale to use for ttl. */ @PublicEvolving public enum TtlTimeCharacteristic { - /** - * Processing time, see also <code> - * org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime</code>. - */ + /** Processing time. */ ProcessingTime } diff --git a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java index fd68cf872f9..b9a1fe045fb 100644 --- a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java +++ b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java @@ -59,7 +59,6 @@ import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; -import static org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC; import static org.apache.flink.util.Preconditions.checkNotNull; /** @@ -328,11 +327,7 @@ public class ExecutionEnvironmentImpl implements ExecutionEnvironment { // We copy the transformation so that newly added transformations cannot intervene with the // stream graph generation. return new StreamGraphGenerator( - new ArrayList<>(transformations), - executionConfig, - checkpointCfg, - configuration) - .setTimeCharacteristic(DEFAULT_TIME_CHARACTERISTIC); + new ArrayList<>(transformations), executionConfig, checkpointCfg, configuration); } private PipelineExecutor getPipelineExecutor() throws Exception { diff --git a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py index 880c71e3145..5c85a5756af 100644 --- a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py +++ b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py @@ -22,7 +22,7 @@ from pyflink.common import Duration from pyflink.common.serialization import SimpleStringSchema from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import TimestampAssigner, WatermarkStrategy -from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic +from pyflink.datastream import StreamExecutionEnvironment from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer from pyflink.datastream.formats.json import JsonRowDeserializationSchema from pyflink.datastream.functions import KeyedProcessFunction @@ -36,7 +36,6 @@ def python_data_stream_example(): # are processed by the same worker and the collected result would be in order which is good for # assertion. env.set_parallelism(1) - env.set_stream_time_characteristic(TimeCharacteristic.EventTime) type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 'payPlatform', 'provinceId'], [Types.LONG(), Types.LONG(), Types.DOUBLE(), Types.INT(), diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java index 2394c3e1826..6505ca96d3f 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java @@ -28,7 +28,6 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; @@ -117,7 +116,6 @@ public class ContinuousFileProcessingMigrationTest implements MigrationTest { OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = createHarness(format); - testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); testHarness.open(); // create some state in the reader testHarness.processElement(new StreamRecord<>(split1)); @@ -152,7 +150,6 @@ public class ContinuousFileProcessingMigrationTest implements MigrationTest { OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> testHarness = createHarness(format); - testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); testHarness.setup(); diff --git a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java index d32d63bc06e..92ceed2322c 100644 --- a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java +++ b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java @@ -30,7 +30,6 @@ import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory; import org.apache.flink.streaming.api.functions.source.FileProcessingMode; @@ -41,15 +40,11 @@ import org.apache.flink.streaming.api.legacy.io.TextInputFormat; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; -import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor; -import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction; -import org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor; import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness; import org.apache.flink.streaming.util.MockStreamingRuntimeContext; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; import org.apache.flink.util.OperatingSystem; import org.apache.flink.util.Preconditions; -import org.apache.flink.util.function.RunnableWithException; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileSystem; @@ -75,7 +70,6 @@ import java.util.Map; import java.util.Set; import java.util.TreeSet; import java.util.UUID; -import java.util.concurrent.ConcurrentLinkedQueue; /** * Tests for the {@link ContinuousFileMonitoringFunction} and {@link ContinuousFileReaderOperator}. @@ -166,180 +160,6 @@ public class ContinuousFileProcessingTest { } } - @Test - public void testFileReadingOperatorWithIngestionTime() throws Exception { - String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; - - Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>(); - Map<Integer, String> expectedFileContents = new HashMap<>(); - Map<String, Long> modTimes = new HashMap<>(); - for (int i = 0; i < NO_OF_FILES; i++) { - Tuple2<org.apache.hadoop.fs.Path, String> file = - createFileAndFillWithData(testBasePath, "file", i, "This is test line."); - filesCreated.add(file.f0); - modTimes.put(file.f0.getName(), hdfs.getFileStatus(file.f0).getModificationTime()); - expectedFileContents.put(i, file.f1); - } - - TextInputFormat format = new TextInputFormat(new Path(testBasePath)); - - final long watermarkInterval = 10; - - final OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester = - createHarness(format); - SteppingMailboxProcessor localMailbox = createLocalMailbox(tester); - - tester.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval); - tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime); - - tester.open(); - Assert.assertEquals(TimeCharacteristic.IngestionTime, tester.getTimeCharacteristic()); - - tester.setProcessingTime(201); - - // test that watermarks are correctly emitted - ConcurrentLinkedQueue<Object> output = tester.getOutput(); - while (output.isEmpty()) { - localMailbox.runMailboxStep(); - } - Assert.assertTrue(output.toString(), output.peek() instanceof Watermark); - Assert.assertEquals(200, ((Watermark) output.poll()).getTimestamp()); - - tester.setProcessingTime(301); - Assert.assertTrue(output.peek() instanceof Watermark); - Assert.assertEquals(300, ((Watermark) output.poll()).getTimestamp()); - - tester.setProcessingTime(401); - Assert.assertTrue(output.peek() instanceof Watermark); - Assert.assertEquals(400, ((Watermark) output.poll()).getTimestamp()); - - tester.setProcessingTime(501); - Assert.assertTrue(output.peek() instanceof Watermark); - Assert.assertEquals(500, ((Watermark) output.poll()).getTimestamp()); - - Assert.assertTrue(output.isEmpty()); - - // create the necessary splits for the test - FileInputSplit[] splits = - format.createInputSplits(tester.getExecutionConfig().getParallelism()); - - // and feed them to the operator - Map<Integer, List<String>> actualFileContents = new HashMap<>(); - - long lastSeenWatermark = Long.MIN_VALUE; - int lineCounter = 0; // counter for the lines read from the splits - int watermarkCounter = 0; - - for (FileInputSplit split : splits) { - - // set the next "current processing time". - long nextTimestamp = tester.getProcessingTime() + watermarkInterval; - tester.setProcessingTime(nextTimestamp); - - // send the next split to be read and wait until it is fully read, the +1 is for the - // watermark. - RunnableWithException runnableWithException = - () -> - tester.processElement( - new StreamRecord<>( - new TimestampedFileInputSplit( - modTimes.get(split.getPath().getName()), - split.getSplitNumber(), - split.getPath(), - split.getStart(), - split.getLength(), - split.getHostnames()))); - runnableWithException.run(); - - // NOTE: the following check works because each file fits in one split. - // In other case it would fail and wait forever. - // BUT THIS IS JUST FOR THIS TEST - while (tester.getOutput().isEmpty() - || tester.getOutput().size() != (LINES_PER_FILE + 1)) { - localMailbox.runMailboxStep(); - } - - // verify that the results are the expected - for (Object line : tester.getOutput()) { - - if (line instanceof StreamRecord) { - - @SuppressWarnings("unchecked") - StreamRecord<String> element = (StreamRecord<String>) line; - lineCounter++; - - Assert.assertEquals(nextTimestamp, element.getTimestamp()); - - int fileIdx = Character.getNumericValue(element.getValue().charAt(0)); - List<String> content = actualFileContents.get(fileIdx); - if (content == null) { - content = new ArrayList<>(); - actualFileContents.put(fileIdx, content); - } - content.add(element.getValue() + "\n"); - } else if (line instanceof Watermark) { - long watermark = ((Watermark) line).getTimestamp(); - - Assert.assertEquals( - nextTimestamp - (nextTimestamp % watermarkInterval), watermark); - Assert.assertTrue(watermark > lastSeenWatermark); - watermarkCounter++; - - lastSeenWatermark = watermark; - } else { - Assert.fail("Unknown element in the list."); - } - } - - // clean the output to be ready for the next split - tester.getOutput().clear(); - } - - // now we are processing one split after the other, - // so all the elements must be here by now. - Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter); - - // because we expect one watermark per split. - Assert.assertEquals(splits.length, watermarkCounter); - - // then close the reader gracefully so that the Long.MAX watermark is emitted - synchronized (tester.getCheckpointLock()) { - tester.close(); - } - - for (org.apache.hadoop.fs.Path file : filesCreated) { - hdfs.delete(file, false); - } - - // check if the last element is the LongMax watermark (by now this must be the only element) - Assert.assertEquals(1, tester.getOutput().size()); - Assert.assertTrue(tester.getOutput().peek() instanceof Watermark); - Assert.assertEquals(Long.MAX_VALUE, ((Watermark) tester.getOutput().poll()).getTimestamp()); - - // check if the elements are the expected ones. - Assert.assertEquals(expectedFileContents.size(), actualFileContents.size()); - for (Integer fileIdx : expectedFileContents.keySet()) { - Assert.assertTrue( - "file" + fileIdx + " not found", actualFileContents.keySet().contains(fileIdx)); - - List<String> cntnt = actualFileContents.get(fileIdx); - Collections.sort( - cntnt, - new Comparator<String>() { - @Override - public int compare(String o1, String o2) { - return getLineNo(o1) - getLineNo(o2); - } - }); - - StringBuilder cntntStr = new StringBuilder(); - for (String line : cntnt) { - cntntStr.append(line); - } - Assert.assertEquals(expectedFileContents.get(fileIdx), cntntStr.toString()); - } - } - private <T> OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, T> createHarness( FileInputFormat<T> format) throws Exception { ExecutionConfig config = new ExecutionConfig(); @@ -350,14 +170,6 @@ public class ContinuousFileProcessingTest { .createSerializer(config.getSerializerConfig())); } - private SteppingMailboxProcessor createLocalMailbox( - OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness) { - return new SteppingMailboxProcessor( - MailboxDefaultAction.Controller::suspendDefaultAction, - harness.getTaskMailbox(), - StreamTaskActionExecutor.IMMEDIATE); - } - @Test public void testFileReadingOperatorWithEventTime() throws Exception { String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/"; @@ -378,7 +190,6 @@ public class ContinuousFileProcessingTest { OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> tester = createHarness(format); - tester.setTimeCharacteristic(TimeCharacteristic.EventTime); tester.open(); // create the necessary splits for the test @@ -483,7 +294,6 @@ public class ContinuousFileProcessingTest { OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> initTestInstance = createHarness(format); - initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime); initTestInstance.open(); // create some state in the reader @@ -504,7 +314,6 @@ public class ContinuousFileProcessingTest { OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, FileInputSplit> restoredTestInstance = createHarness(new BlockingFileInputFormat(latch, new Path(testBasePath))); - restoredTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime); restoredTestInstance.initializeState(snapshot); restoredTestInstance.open(); diff --git a/flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst b/flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst index ea8be1925d3..5680dc50f26 100644 --- a/flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst +++ b/flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst @@ -55,8 +55,6 @@ access). StreamExecutionEnvironment.is_changelog_state_backend_enabled StreamExecutionEnvironment.set_default_savepoint_directory StreamExecutionEnvironment.get_default_savepoint_directory - StreamExecutionEnvironment.set_stream_time_characteristic - StreamExecutionEnvironment.get_stream_time_characteristic StreamExecutionEnvironment.configure StreamExecutionEnvironment.add_python_file StreamExecutionEnvironment.set_python_requirements diff --git a/flink-python/docs/reference/pyflink.datastream/timer.rst b/flink-python/docs/reference/pyflink.datastream/timer.rst index cc1ee50709e..88da53d76ce 100644 --- a/flink-python/docs/reference/pyflink.datastream/timer.rst +++ b/flink-python/docs/reference/pyflink.datastream/timer.rst @@ -37,17 +37,6 @@ TimerService TimerService.delete_event_time_timer -TimeCharacteristic ------------------- - -.. currentmodule:: pyflink.datastream.time_characteristic - -.. autosummary:: - :toctree: api/ - - TimeCharacteristic - - TimeDomain ---------- diff --git a/flink-python/pyflink/datastream/__init__.py b/flink-python/pyflink/datastream/__init__.py index fce9563b7fc..f98fe98dc96 100644 --- a/flink-python/pyflink/datastream/__init__.py +++ b/flink-python/pyflink/datastream/__init__.py @@ -243,9 +243,6 @@ Classes to define formats used together with source & sink: Other important classes: - - :class:`TimeCharacteristic`: - Defines how the system determines time for time-dependent order and operations that depend - on time (such as time windows). - :class:`TimeDomain`: Specifies whether a firing timer is based on event time or processing time. - :class:`KeySelector`: @@ -280,7 +277,6 @@ from pyflink.datastream.checkpoint_storage import (CheckpointStorage, JobManager FileSystemCheckpointStorage, CustomCheckpointStorage) from pyflink.datastream.stream_execution_environment import StreamExecutionEnvironment -from pyflink.datastream.time_characteristic import TimeCharacteristic from pyflink.datastream.time_domain import TimeDomain from pyflink.datastream.functions import ProcessFunction from pyflink.datastream.timerservice import TimerService @@ -335,7 +331,6 @@ __all__ = [ 'MergingWindowAssigner', 'TriggerResult', 'Trigger', - 'TimeCharacteristic', 'TimeDomain', 'KeySelector', 'Partitioner', diff --git a/flink-python/pyflink/datastream/functions.py b/flink-python/pyflink/datastream/functions.py index 29f38617f92..69e193fa167 100644 --- a/flink-python/pyflink/datastream/functions.py +++ b/flink-python/pyflink/datastream/functions.py @@ -645,8 +645,7 @@ class ProcessFunction(Function): """ Timestamp of the element currently being processed or timestamp of a firing timer. - This might be None, for example if the time characteristic of your program is set to - TimeCharacteristic.ProcessTime. + This might be None, depending on the stream's watermark strategy. """ pass @@ -697,8 +696,7 @@ class KeyedProcessFunction(Function): """ Timestamp of the element currently being processed or timestamp of a firing timer. - This might be None, for example if the time characteristic of your program is set to - TimeCharacteristic.ProcessTime. + This might be None, depending on the stream's watermark strategy. """ pass @@ -772,8 +770,7 @@ class CoProcessFunction(Function): """ Timestamp of the element currently being processed or timestamp of a firing timer. - This might be None, for example if the time characteristic of your program is set to - TimeCharacteristic.ProcessTime. + This might be None, depending on the stream's watermark strategy. """ pass @@ -843,8 +840,7 @@ register a timer that will trigger an action in the future. """ Timestamp of the element currently being processed or timestamp of a firing timer. - This might be None, for example if the time characteristic of your program is set to - TimeCharacteristic.ProcessTime. + This might be None, depending on the stream's watermark strategy. """ pass @@ -1347,8 +1343,7 @@ class BaseBroadcastProcessFunction(Function): def timestamp(self) -> int: """ Timestamp of the element currently being processed or timestamp of a firing timer. - This might be None, for example if the time characteristic of your program is - set to :attr:`TimeCharacteristic.ProcessingTime`. + This might be None, depending on the stream's watermark strategy. """ pass diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py b/flink-python/pyflink/datastream/stream_execution_environment.py index 3e507d1be56..00e639ec379 100644 --- a/flink-python/pyflink/datastream/stream_execution_environment.py +++ b/flink-python/pyflink/datastream/stream_execution_environment.py @@ -35,7 +35,6 @@ from pyflink.datastream.connectors import Source from pyflink.datastream.data_stream import DataStream from pyflink.datastream.execution_mode import RuntimeExecutionMode from pyflink.datastream.functions import SourceFunction -from pyflink.datastream.time_characteristic import TimeCharacteristic from pyflink.datastream.utils import ResultTypeQueryable from pyflink.java_gateway import get_gateway from pyflink.serializers import PickleSerializer @@ -375,40 +374,6 @@ class StreamExecutionEnvironment(object): else: return j_path.toString() - def set_stream_time_characteristic(self, characteristic: TimeCharacteristic): - """ - Sets the time characteristic for all streams create from this environment, e.g., processing - time, event time, or ingestion time. - - If you set the characteristic to IngestionTime of EventTime this will set a default - watermark update interval of 200 ms. If this is not applicable for your application - you should change it using - :func:`pyflink.common.ExecutionConfig.set_auto_watermark_interval`. - - Example: - :: - - >>> env.set_stream_time_characteristic(TimeCharacteristic.EventTime) - - :param characteristic: The time characteristic, which could be - :data:`TimeCharacteristic.ProcessingTime`, - :data:`TimeCharacteristic.IngestionTime`, - :data:`TimeCharacteristic.EventTime`. - """ - j_characteristic = TimeCharacteristic._to_j_time_characteristic(characteristic) - self._j_stream_execution_environment.setStreamTimeCharacteristic(j_characteristic) - - def get_stream_time_characteristic(self) -> 'TimeCharacteristic': - """ - Gets the time characteristic. - - .. seealso:: :func:`set_stream_time_characteristic` - - :return: The :class:`TimeCharacteristic`. - """ - j_characteristic = self._j_stream_execution_environment.getStreamTimeCharacteristic() - return TimeCharacteristic._from_j_time_characteristic(j_characteristic) - def configure(self, configuration: Configuration): """ Sets all relevant options contained in the :class:`~pyflink.common.Configuration`. such as diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py b/flink-python/pyflink/datastream/tests/test_data_stream.py index 8d8de717639..d696c492f41 100644 --- a/flink-python/pyflink/datastream/tests/test_data_stream.py +++ b/flink-python/pyflink/datastream/tests/test_data_stream.py @@ -26,8 +26,8 @@ from pyflink.common import Row, Configuration from pyflink.common.time import Time from pyflink.common.typeinfo import Types from pyflink.common.watermark_strategy import WatermarkStrategy, TimestampAssigner -from pyflink.datastream import (TimeCharacteristic, RuntimeContext, SlotSharingGroup, - StreamExecutionEnvironment, RuntimeExecutionMode) +from pyflink.datastream import (RuntimeContext, SlotSharingGroup, StreamExecutionEnvironment, + RuntimeExecutionMode) from pyflink.datastream.data_stream import DataStream from pyflink.datastream.functions import (AggregateFunction, CoMapFunction, CoFlatMapFunction, MapFunction, FilterFunction, FlatMapFunction, @@ -122,7 +122,6 @@ class DataStreamTests(object): def test_keyed_process_function_with_state(self): self.env.get_config().set_auto_watermark_interval(2000) - self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) data_stream = self.env.from_collection([(1, 'hi', '1603708211000'), (2, 'hello', '1603708224000'), (3, 'hi', '1603708226000'), @@ -1090,7 +1089,6 @@ class ProcessDataStreamTests(DataStreamTests): def test_process_function(self): self.env.set_parallelism(1) self.env.get_config().set_auto_watermark_interval(2000) - self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime) data_stream = self.env.from_collection([(1, '1603708211000'), (2, '1603708224000'), (3, '1603708226000'), diff --git a/flink-python/pyflink/datastream/time_characteristic.py b/flink-python/pyflink/datastream/time_characteristic.py deleted file mode 100644 index 9187b39f382..00000000000 --- a/flink-python/pyflink/datastream/time_characteristic.py +++ /dev/null @@ -1,95 +0,0 @@ -################################################################################ -# Licensed to the Apache Software Foundation (ASF) under one -# or more contributor license agreements. See the NOTICE file -# distributed with this work for additional information -# regarding copyright ownership. The ASF licenses this file -# to you under the Apache License, Version 2.0 (the -# "License"); you may not use this file except in compliance -# with the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -################################################################################ -from enum import Enum - -from pyflink.java_gateway import get_gateway - -__all__ = ['TimeCharacteristic'] - - -class TimeCharacteristic(Enum): - """ - The time characteristic defines how the system determines time for time-dependent - order and operations that depend on time (such as time windows). - - :data:`ProcessingTime`: - - Processing time for operators means that the operator uses the system clock of the machine - to determine the current time of the data stream. Processing-time windows trigger based - on wall-clock time and include whatever elements happen to have arrived at the operator at - that point in time. - - Using processing time for window operations results in general in quite non-deterministic - results, because the contents of the windows depends on the speed in which elements arrive. - It is, however, the cheapest method of forming windows and the method that introduces the - least latency. - - :data:`IngestionTime`: - - Ingestion time means that the time of each individual element in the stream is determined - when the element enters the Flink streaming data flow. Operations like windows group the - elements based on that time, meaning that processing speed within the streaming dataflow - does not affect windowing, but only the speed at which sources receive elements. - - Ingestion time is often a good compromise between processing time and event time. - It does not need any special manual form of watermark generation, and events are typically - not too much out-or-order when they arrive at operators; in fact, out-of-orderness can - only be introduced by streaming shuffles or split/join/union operations. The fact that - elements are not very much out-of-order means that the latency increase is moderate, - compared to event time. - - :data:`EventTime`: - - Event time means that the time of each individual element in the stream (also called event) - is determined by the event's individual custom timestamp. These timestamps either exist in - the elements from before they entered the Flink streaming dataflow, or are user-assigned at - the sources. The big implication of this is that it allows for elements to arrive in the - sources and in all operators out of order, meaning that elements with earlier timestamps may - arrive after elements with later timestamps. - - Operators that window or order data with respect to event time must buffer data until they - can be sure that all timestamps for a certain time interval have been received. This is - handled by the so called "time watermarks". - - Operations based on event time are very predictable - the result of windowing operations - is typically identical no matter when the window is executed and how fast the streams - operate. At the same time, the buffering and tracking of event time is also costlier than - operating with processing time, and typically also introduces more latency. The amount of - extra cost depends mostly on how much out of order the elements arrive, i.e., how long the - time span between the arrival of early and late elements is. With respect to the - "time watermarks", this means that the cost typically depends on how early or late the - watermarks can be generated for their timestamp. - - In relation to :data:`IngestionTime`, the event time is similar, but refers the event's - original time, rather than the time assigned at the data source. Practically, that means - that event time has generally more meaning, but also that it takes longer to determine - that all elements for a certain time have arrived. - """ - - ProcessingTime = 0 - IngestionTime = 1 - EventTime = 2 - - @staticmethod - def _from_j_time_characteristic(j_time_characteristic) -> 'TimeCharacteristic': - return TimeCharacteristic[j_time_characteristic.name()] - - def _to_j_time_characteristic(self): - gateway = get_gateway() - JTimeCharacteristic = gateway.jvm.org.apache.flink.streaming.api.TimeCharacteristic - return getattr(JTimeCharacteristic, self.name) diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java deleted file mode 100644 index eaa04f34e75..00000000000 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java +++ /dev/null @@ -1,98 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.streaming.api; - -import org.apache.flink.annotation.PublicEvolving; -import org.apache.flink.api.common.ExecutionConfig; -import org.apache.flink.api.common.eventtime.WatermarkStrategy; - -/** - * The time characteristic defines how the system determines time for time-dependent order and - * operations that depend on time (such as time windows). - * - * @deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link - * TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling - * event-time support anymore. Explicitly using processing-time windows and timers works in - * event-time mode. If you need to disable watermarks, please use {@link - * ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link - * TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link - * WatermarkStrategy}. If you are using generic "time window" operations that change behaviour - * based on the time characteristic, please use equivalent operations that explicitly specify - * processing time or event time. - */ -@PublicEvolving -@Deprecated -public enum TimeCharacteristic { - - /** - * Processing time for operators means that the operator uses the system clock of the machine to - * determine the current time of the data stream. Processing-time windows trigger based on - * wall-clock time and include whatever elements happen to have arrived at the operator at that - * point in time. - * - * <p>Using processing time for window operations results in general in quite non-deterministic - * results, because the contents of the windows depends on the speed in which elements arrive. - * It is, however, the cheapest method of forming windows and the method that introduces the - * least latency. - */ - ProcessingTime, - - /** - * Ingestion time means that the time of each individual element in the stream is determined - * when the element enters the Flink streaming data flow. Operations like windows group the - * elements based on that time, meaning that processing speed within the streaming dataflow does - * not affect windowing, but only the speed at which sources receive elements. - * - * <p>Ingestion time is often a good compromise between processing time and event time. It does - * not need any special manual form of watermark generation, and events are typically not too - * much out-or-order when they arrive at operators; in fact, out-of-orderness can only be - * introduced by streaming shuffles or split/join/union operations. The fact that elements are - * not very much out-of-order means that the latency increase is moderate, compared to event - * time. - */ - IngestionTime, - - /** - * Event time means that the time of each individual element in the stream (also called event) - * is determined by the event's individual custom timestamp. These timestamps either exist in - * the elements from before they entered the Flink streaming dataflow, or are user-assigned at - * the sources. The big implication of this is that it allows for elements to arrive in the - * sources and in all operators out of order, meaning that elements with earlier timestamps may - * arrive after elements with later timestamps. - * - * <p>Operators that window or order data with respect to event time must buffer data until they - * can be sure that all timestamps for a certain time interval have been received. This is - * handled by the so called "time watermarks". - * - * <p>Operations based on event time are very predictable - the result of windowing operations - * is typically identical no matter when the window is executed and how fast the streams - * operate. At the same time, the buffering and tracking of event time is also costlier than - * operating with processing time, and typically also introduces more latency. The amount of - * extra cost depends mostly on how much out of order the elements arrive, i.e., how long the - * time span between the arrival of early and late elements is. With respect to the "time - * watermarks", this means that the cost typically depends on how early or late the watermarks - * can be generated for their timestamp. - * - * <p>In relation to {@link #IngestionTime}, the event time is similar, but refers the event's - * original time, rather than the time assigned at the data source. Practically, that means that - * event time has generally more meaning, but also that it takes longer to determine that all - * elements for a certain time have arrived. - */ - EventTime -} diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java index 0bed680d10b..55d4bef793a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java @@ -72,7 +72,6 @@ import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.DataStreamSource; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; @@ -96,7 +95,6 @@ import org.apache.flink.streaming.api.graph.StreamGraphGenerator; import org.apache.flink.streaming.api.operators.StreamSource; import org.apache.flink.streaming.api.operators.collect.CollectResultIterator; import org.apache.flink.streaming.api.transformations.CacheTransformation; -import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner; import org.apache.flink.util.AbstractID; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -150,10 +148,6 @@ public class StreamExecutionEnvironment implements AutoCloseable { collectIterators.add(iterator); } - /** The time characteristic that is used if none other is set. */ - private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = - TimeCharacteristic.EventTime; - /** * The environment of the context (local by default, cluster if invoked through command line). */ @@ -178,9 +172,6 @@ public class StreamExecutionEnvironment implements AutoCloseable { private final Map<AbstractID, CacheTransformation<?>> cachedTransformations = new HashMap<>(); - /** The time characteristic used by the data streams. */ - private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; - /** * Now we could not migrate this field to configuration. Because this object field remains * directly accessible and modifiable as it is exposed through a getter to users, allowing @@ -703,50 +694,6 @@ public class StreamExecutionEnvironment implements AutoCloseable { // Time characteristic // -------------------------------------------------------------------------------------------- - /** - * Sets the time characteristic for all streams create from this environment, e.g., processing - * time, event time, or ingestion time. - * - * <p>If you set the characteristic to IngestionTime of EventTime this will set a default - * watermark update interval of 200 ms. If this is not applicable for your application you - * should change it using {@link ExecutionConfig#setAutoWatermarkInterval(long)}. - * - * @param characteristic The time characteristic. - * @deprecated In Flink 1.12 the default stream time characteristic has been changed to {@link - * TimeCharacteristic#EventTime}, thus you don't need to call this method for enabling - * event-time support anymore. Explicitly using processing-time windows and timers works in - * event-time mode. If you need to disable watermarks, please use {@link - * ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link - * TimeCharacteristic#IngestionTime}, please manually set an appropriate {@link - * WatermarkStrategy}. If you are using generic "time window" operations (for example - * through {@link - * org.apache.flink.streaming.api.datastream.KeyedStream#window(WindowAssigner)} that change - * behaviour based on the time characteristic, please use equivalent operations that - * explicitly specify processing time or event time. - */ - @PublicEvolving - @Deprecated - public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { - this.timeCharacteristic = Preconditions.checkNotNull(characteristic); - if (characteristic == TimeCharacteristic.ProcessingTime) { - getConfig().setAutoWatermarkInterval(0); - } else { - getConfig().setAutoWatermarkInterval(200); - } - } - - /** - * Gets the time characteristic. - * - * @deprecated See {@link #setStreamTimeCharacteristic(TimeCharacteristic)} for deprecation - * notice. - */ - @PublicEvolving - @Deprecated - public TimeCharacteristic getStreamTimeCharacteristic() { - return timeCharacteristic; - } - /** * Sets all relevant options contained in the {@link ReadableConfig}. It will reconfigure {@link * StreamExecutionEnvironment}, {@link ExecutionConfig} and {@link CheckpointConfig}. @@ -2143,7 +2090,6 @@ public class StreamExecutionEnvironment implements AutoCloseable { // stream graph generation. return new StreamGraphGenerator( new ArrayList<>(transformations), config, checkpointCfg, configuration) - .setTimeCharacteristic(getStreamTimeCharacteristic()) .setSlotSharingGroupResource(slotSharingGroupResources); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java index 6ce5ab9be85..e940c691231 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java @@ -93,8 +93,7 @@ public abstract class KeyedProcessFunction<K, I, O> extends AbstractRichFunction /** * Timestamp of the element currently being processed or timestamp of a firing timer. * - * <p>This might be {@code null}, for example if the time characteristic of your program is - * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + * <p>This might be {@code null}, depending on the stream's watermark strategy. */ public abstract Long timestamp(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java index fc310ab98c0..8976e5a554f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java @@ -92,8 +92,7 @@ public abstract class ProcessFunction<I, O> extends AbstractRichFunction { /** * Timestamp of the element currently being processed or timestamp of a firing timer. * - * <p>This might be {@code null}, for example if the time characteristic of your program is - * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + * <p>This might be {@code null}, depending on the stream's watermark strategy. */ public abstract Long timestamp(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java index e23b11e87ee..8e958c5f156 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java @@ -44,8 +44,7 @@ public abstract class BaseBroadcastProcessFunction extends AbstractRichFunction /** * Timestamp of the element currently being processed or timestamp of a firing timer. * - * <p>This might be {@code null}, for example if the time characteristic of your program is - * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + * <p>This might be {@code null}, depending on the stream's watermark strategy. */ public abstract Long timestamp(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java index d0969078220..38c29977fde 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java @@ -109,8 +109,7 @@ public abstract class CoProcessFunction<IN1, IN2, OUT> extends AbstractRichFunct /** * Timestamp of the element currently being processed or timestamp of a firing timer. * - * <p>This might be {@code null}, for example if the time characteristic of your program is - * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + * <p>This might be {@code null}, depending on the stream's watermark strategy. */ public abstract Long timestamp(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedCoProcessFunction.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedCoProcessFunction.java index 5228d1ff88b..a3ed1a88c18 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedCoProcessFunction.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedCoProcessFunction.java @@ -109,8 +109,7 @@ public abstract class KeyedCoProcessFunction<K, IN1, IN2, OUT> extends AbstractR /** * Timestamp of the element currently being processed or timestamp of a firing timer. * - * <p>This might be {@code null}, for example if the time characteristic of your program is - * set to {@link org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}. + * <p>This might be {@code null}, depending on the stream's watermark strategy. */ public abstract Long timestamp(); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java index 02baed7602d..02a23b40763 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java @@ -312,7 +312,6 @@ public class ContinuousFileReaderOperator<OUT, T extends TimestampedInputSplit> this.sourceContext = StreamSourceContexts.getSourceContext( - getOperatorConfig().getTimeCharacteristic(), getProcessingTimeService(), new Object(), // no actual locking needed output, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java index 7414b822639..c8cf8fe2958 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java @@ -30,11 +30,9 @@ import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.core.memory.ManagedMemoryUseCase; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.operators.util.CorruptConfigurationException; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; import org.apache.flink.streaming.api.operators.SimpleOperatorFactory; import org.apache.flink.streaming.api.operators.StreamOperator; @@ -129,9 +127,6 @@ public class StreamConfig implements Serializable { private static final String STATE_KEY_SERIALIZER = "statekeyser"; - private static final ConfigOption<Integer> TIME_CHARACTERISTIC = - ConfigOptions.key("timechar").intType().defaultValue(-1); - private static final String MANAGED_MEMORY_FRACTION_PREFIX = "managedMemFraction."; private static final String ATTRIBUTE = "attribute"; @@ -304,19 +299,6 @@ public class StreamConfig implements Serializable { .collect(Collectors.toSet()); } - public void setTimeCharacteristic(TimeCharacteristic characteristic) { - config.set(TIME_CHARACTERISTIC, characteristic.ordinal()); - } - - public TimeCharacteristic getTimeCharacteristic() { - int ordinal = config.get(TIME_CHARACTERISTIC, -1); - if (ordinal >= 0) { - return TimeCharacteristic.values()[ordinal]; - } else { - throw new CorruptConfigurationException("time characteristic is not set"); - } - } - public void setTypeSerializerOut(TypeSerializer<?> serializer) { setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java index 0823ed29e32..95761820235 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable; import org.apache.flink.runtime.state.CheckpointStorage; import org.apache.flink.runtime.state.StateBackend; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.lineage.LineageGraph; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; @@ -103,8 +102,6 @@ public class StreamGraph implements Pipeline { private final CheckpointConfig checkpointConfig; private SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.none(); - private TimeCharacteristic timeCharacteristic; - private GlobalStreamExchangeMode globalExchangeMode; private boolean enableCheckpointsAfterTasksFinish; @@ -242,14 +239,6 @@ public class StreamGraph implements Pipeline { .orElse(new ArrayList<>()); } - public TimeCharacteristic getTimeCharacteristic() { - return timeCharacteristic; - } - - public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { - this.timeCharacteristic = timeCharacteristic; - } - public GlobalStreamExchangeMode getGlobalStreamExchangeMode() { return globalExchangeMode; } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java index a7a20892409..d6396f66166 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java @@ -40,7 +40,6 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobgraph.JobType; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.lineage.LineageGraph; import org.apache.flink.streaming.api.lineage.LineageGraphUtils; @@ -136,9 +135,6 @@ public class StreamGraphGenerator { public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM = KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM; - public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = - TimeCharacteristic.ProcessingTime; - public static final String DEFAULT_STREAMING_JOB_NAME = "Flink Streaming Job"; public static final String DEFAULT_BATCH_JOB_NAME = "Flink Batch Job"; @@ -156,8 +152,6 @@ public class StreamGraphGenerator { // Records the slot sharing groups and their corresponding fine-grained ResourceProfile private final Map<String, ResourceProfile> slotSharingGroupResources = new HashMap<>(); - private TimeCharacteristic timeCharacteristic = DEFAULT_TIME_CHARACTERISTIC; - private SavepointRestoreSettings savepointRestoreSettings; private boolean shouldExecuteInBatchMode; @@ -228,11 +222,6 @@ public class StreamGraphGenerator { this.savepointRestoreSettings = SavepointRestoreSettings.fromConfiguration(configuration); } - public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { - this.timeCharacteristic = timeCharacteristic; - return this; - } - /** * Specify fine-grained resource requirements for slot sharing groups. * @@ -307,7 +296,6 @@ public class StreamGraphGenerator { private void configureStreamGraph(final StreamGraph graph) { checkNotNull(graph); - graph.setTimeCharacteristic(timeCharacteristic); graph.setVertexDescriptionMode(configuration.get(PipelineOptions.VERTEX_DESCRIPTION_MODE)); graph.setVertexNameIncludeIndexPrefix( configuration.get(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX)); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 570335a8789..1059ee14885 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -1129,8 +1129,6 @@ public class StreamingJobGraphGenerator { config.setStreamOperatorFactory(vertex.getOperatorFactory()); - config.setTimeCharacteristic(streamGraph.getTimeCharacteristic()); - final CheckpointConfig checkpointCfg = streamGraph.getCheckpointConfig(); config.setStateBackend(streamGraph.getStateBackend()); diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java index d40f6e6f1c8..444a3cfaf5b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java @@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.annotation.VisibleForTesting; import org.apache.flink.configuration.Configuration; import org.apache.flink.configuration.MetricOptions; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.OperatorChain; @@ -75,8 +74,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> final OperatorChain<?, ?> operatorChain) throws Exception { - final TimeCharacteristic timeCharacteristic = getOperatorConfig().getTimeCharacteristic(); - final Configuration configuration = this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration(); final long latencyTrackingInterval = @@ -99,7 +96,6 @@ public class StreamSource<OUT, SRC extends SourceFunction<OUT>> this.ctx = StreamSourceContexts.getSourceContext( - timeCharacteristic, getProcessingTimeService(), lockingObject, collector, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java index f4d3416cec0..ba44c53b566 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators; import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; @@ -38,18 +37,7 @@ import java.util.concurrent.ScheduledFuture; @Deprecated public class StreamSourceContexts { - /** - * Depending on the {@link TimeCharacteristic}, this method will return the adequate {@link - * SourceFunction.SourceContext}. That is: - * - * <ul> - * <li>{@link TimeCharacteristic#IngestionTime} = {@code AutomaticWatermarkContext} - * <li>{@link TimeCharacteristic#ProcessingTime} = {@code NonTimestampContext} - * <li>{@link TimeCharacteristic#EventTime} = {@code ManualWatermarkContext} - * </ul> - */ public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext( - TimeCharacteristic timeCharacteristic, ProcessingTimeService processingTimeService, Object checkpointLock, Output<StreamRecord<OUT>> output, @@ -57,37 +45,14 @@ public class StreamSourceContexts { long idleTimeout, boolean emitProgressiveWatermarks) { - final SourceFunction.SourceContext<OUT> ctx; - switch (timeCharacteristic) { - case EventTime: - ctx = - new ManualWatermarkContext<>( - output, - processingTimeService, - checkpointLock, - idleTimeout, - emitProgressiveWatermarks); - - break; - case IngestionTime: - Preconditions.checkState( - emitProgressiveWatermarks, - "Ingestion time is not available when emitting progressive watermarks " - + "is disabled."); - ctx = - new AutomaticWatermarkContext<>( - output, - watermarkInterval, - processingTimeService, - checkpointLock, - idleTimeout); - break; - case ProcessingTime: - ctx = new NonTimestampContext<>(checkpointLock, output); - break; - default: - throw new IllegalArgumentException(String.valueOf(timeCharacteristic)); - } + final SourceFunction.SourceContext<OUT> ctx = + new ManualWatermarkContext<>( + output, + processingTimeService, + checkpointLock, + idleTimeout, + emitProgressiveWatermarks); + return new SwitchingOnClose<>(ctx); } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java index e8591ad5cc8..b99061bfe90 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java @@ -79,7 +79,6 @@ import org.apache.flink.runtime.taskmanager.AsyncExceptionHandler; import org.apache.flink.runtime.taskmanager.AsynchronousException; import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory; import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.NonChainedOutput; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.InternalTimeServiceManager; @@ -1206,8 +1205,8 @@ public abstract class StreamTask<OUT, OP extends StreamOperator<OUT>> } boolean isSerializingTimestamps() { - TimeCharacteristic tc = configuration.getTimeCharacteristic(); - return tc == TimeCharacteristic.EventTime | tc == TimeCharacteristic.IngestionTime; + // Only used in StreamIterationHead and will soon be removed + return true; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorTest.java index f8779fe9df9..0e65b1ea50d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorTest.java @@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.operators.testutils.ExpectedTestException; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.mailbox.Mail; import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness; @@ -41,7 +40,6 @@ class ContinuousFileReaderOperatorTest { OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness = createHarness(failingFormat()); harness.getExecutionConfig().setAutoWatermarkInterval(10); - harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime); assertThatThrownBy( () -> { @@ -59,7 +57,6 @@ class ContinuousFileReaderOperatorTest { OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> harness = createHarness(failingFormat()); harness.getExecutionConfig().setAutoWatermarkInterval(10); - harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime); assertThatThrownBy( () -> { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java index 367cd0ddfa9..6210e4860ee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java @@ -18,7 +18,6 @@ package org.apache.flink.streaming.api.operators; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.watermark.Watermark; import org.apache.flink.streaming.runtime.streamrecord.StreamElement; @@ -91,7 +90,6 @@ class StreamSourceContextIdleDetectionTests { SourceFunction.SourceContext<String> context = StreamSourceContexts.getSourceContext( - TimeCharacteristic.EventTime, processingTimeService, new Object(), new CollectorOutput<>(output), @@ -163,177 +161,6 @@ class StreamSourceContextIdleDetectionTests { } } - /** - * Test scenario (idleTimeout = 100, watermarkInterval = 40): (1) Start from 20 as initial time. - * (2) As soon as time reaches 120, status should have been toggled to IDLE. (3) After some - * arbitrary time (until 320), the status should remain IDLE, and no watermarks should have been - * emitted. (4) Emit a record at 330. Status should become ACTIVE. This should schedule a - * idleness detection to be fired at 430. (5) Emit another record at 350 (which is before the - * next check). This should make the idleness check pass. (6) Advance time to 430 and trigger - * idleness detection. The status should still be ACTIVE due to step (5). This should schedule a - * idleness detection to be fired at 530. (7) Advance time to 460, in which a watermark emission - * task should be fired. Idleness detection should have been "piggy-backed" in the task, - * allowing the status to be toggled to IDLE before the next actual idle detection task at 530. - * - * <p>Inline comments will refer to the corresponding tested steps in the scenario. - */ - @TestTemplate - void testAutomaticWatermarkContext() throws Exception { - long watermarkInterval = 40; - long idleTimeout = 100; - long initialTime = 20; - - TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); - processingTimeService.setCurrentTime(initialTime); - - final List<StreamElement> output = new ArrayList<>(); - final List<StreamElement> expectedOutput = new ArrayList<>(); - - SourceFunction.SourceContext<String> context = - StreamSourceContexts.getSourceContext( - TimeCharacteristic.IngestionTime, - processingTimeService, - new Object(), - new CollectorOutput<String>(output), - watermarkInterval, - idleTimeout, - true); - - // -------------------------- begin test scenario -------------------------- - - // corresponds to step (2) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + watermarkInterval); - expectedOutput.add( - new Watermark( - processingTimeService.getCurrentProcessingTime() - - (processingTimeService.getCurrentProcessingTime() - % watermarkInterval))); - processingTimeService.setCurrentTime(initialTime + 2 * watermarkInterval); - expectedOutput.add( - new Watermark( - processingTimeService.getCurrentProcessingTime() - - (processingTimeService.getCurrentProcessingTime() - % watermarkInterval))); - processingTimeService.setCurrentTime(initialTime + idleTimeout); - expectedOutput.add(WatermarkStatus.IDLE); - assertThat(output).isEqualTo(expectedOutput); - - // corresponds to step (3) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 3 * watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 4 * watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 2 * idleTimeout); - processingTimeService.setCurrentTime(initialTime + 6 * watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 7 * watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout); - assertThat(output).isEqualTo(expectedOutput); - - // corresponds to step (4) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + idleTimeout / 10); - switch (testMethod) { - case COLLECT: - expectedOutput.add(WatermarkStatus.ACTIVE); - context.collect("msg"); - expectedOutput.add( - new StreamRecord<>( - "msg", processingTimeService.getCurrentProcessingTime())); - expectedOutput.add( - new Watermark( - processingTimeService.getCurrentProcessingTime() - - (processingTimeService.getCurrentProcessingTime() - % watermarkInterval))); - assertThat(output).isEqualTo(expectedOutput); - break; - case COLLECT_WITH_TIMESTAMP: - expectedOutput.add(WatermarkStatus.ACTIVE); - context.collectWithTimestamp( - "msg", processingTimeService.getCurrentProcessingTime()); - expectedOutput.add( - new StreamRecord<>( - "msg", processingTimeService.getCurrentProcessingTime())); - expectedOutput.add( - new Watermark( - processingTimeService.getCurrentProcessingTime() - - (processingTimeService.getCurrentProcessingTime() - % watermarkInterval))); - assertThat(output).isEqualTo(expectedOutput); - break; - case EMIT_WATERMARK: - // for emitWatermark, since the watermark will be blocked, - // it should not make the status become active; - // from here on, the status should remain idle for the emitWatermark variant test - context.emitWatermark( - new Watermark(processingTimeService.getCurrentProcessingTime())); - assertThat(output).isEqualTo(expectedOutput); - } - - // corresponds to step (5) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 8 * watermarkInterval); - processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + 3 * idleTimeout / 10); - switch (testMethod) { - case COLLECT: - context.collect("msg"); - expectedOutput.add( - new StreamRecord<>( - "msg", processingTimeService.getCurrentProcessingTime())); - assertThat(output).isEqualTo(expectedOutput); - break; - case COLLECT_WITH_TIMESTAMP: - context.collectWithTimestamp( - "msg", processingTimeService.getCurrentProcessingTime()); - expectedOutput.add( - new StreamRecord<>( - "msg", processingTimeService.getCurrentProcessingTime())); - assertThat(output).isEqualTo(expectedOutput); - break; - case EMIT_WATERMARK: - context.emitWatermark( - new Watermark(processingTimeService.getCurrentProcessingTime())); - assertThat(output).isEqualTo(expectedOutput); - } - - processingTimeService.setCurrentTime(initialTime + 9 * watermarkInterval); - switch (testMethod) { - case COLLECT: - case COLLECT_WITH_TIMESTAMP: - expectedOutput.add( - new Watermark( - processingTimeService.getCurrentProcessingTime() - - (processingTimeService.getCurrentProcessingTime() - % watermarkInterval))); - assertThat(output).isEqualTo(expectedOutput); - break; - case EMIT_WATERMARK: - assertThat(output).isEqualTo(expectedOutput); - } - - processingTimeService.setCurrentTime(initialTime + 10 * watermarkInterval); - switch (testMethod) { - case COLLECT: - case COLLECT_WITH_TIMESTAMP: - expectedOutput.add( - new Watermark( - processingTimeService.getCurrentProcessingTime() - - (processingTimeService.getCurrentProcessingTime() - % watermarkInterval))); - assertThat(output).isEqualTo(expectedOutput); - break; - case EMIT_WATERMARK: - assertThat(output).isEqualTo(expectedOutput); - } - - // corresponds to step (6) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 4 * idleTimeout + idleTimeout / 10); - assertThat(output).isEqualTo(expectedOutput); - - // corresponds to step (7) of scenario (please see method-level Javadoc comment) - processingTimeService.setCurrentTime(initialTime + 11 * watermarkInterval); - // emit watermark does not change the previous status - if (testMethod != TestMethod.EMIT_WATERMARK) { - expectedOutput.add(WatermarkStatus.IDLE); - } - assertThat(output).isEqualTo(expectedOutput); - } - @Parameters(name = "TestMethod = {0}") @SuppressWarnings("unchecked") private static Collection<TestMethod[]> timeCharacteristic() { diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java index cce23535034..c4ee20a42cb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java @@ -25,7 +25,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.core.fs.Path; import org.apache.flink.runtime.checkpoint.OperatorSubtaskState; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory; import org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit; import org.apache.flink.streaming.api.watermark.Watermark; @@ -180,7 +179,6 @@ class ContinuousFileProcessingRescalingTest { maxParallelism, noOfTasks, taskIdx); - testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime); return testHarness; } diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java index 529e7f1bc97..8fd1d2b4682 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java @@ -27,7 +27,6 @@ import org.apache.flink.runtime.operators.testutils.MockEnvironment; import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder; import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.Output; @@ -237,7 +236,6 @@ class StreamSourceOperatorLatencyMetricsTest { StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setStateBackend(new HashMapStateBackend()); - cfg.setTimeCharacteristic(TimeCharacteristic.EventTime); cfg.setOperatorID(new OperatorID()); cfg.serializeAllConfigs(); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java index 2b0cbea8fa9..19a0e7d8404 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java @@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.ReduceFunction; import org.apache.flink.api.common.state.ListStateDescriptor; import org.apache.flink.api.common.state.ReducingStateDescriptor; import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.WindowedStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -55,7 +54,6 @@ class TimeWindowTranslationTest { @Test void testAlignedWindowDeprecation() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<Tuple2<String, Integer>> source = env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); @@ -105,7 +103,6 @@ class TimeWindowTranslationTest { @SuppressWarnings("rawtypes") void testReduceEventTimeWindows() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<Tuple2<String, Integer>> source = env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); @@ -133,7 +130,6 @@ class TimeWindowTranslationTest { @SuppressWarnings("rawtypes") void testApplyEventTimeWindows() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime); DataStream<Tuple2<String, Integer>> source = env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2)); diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java index a831ffa64ea..8e8edd9ce3e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java @@ -74,7 +74,6 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.util.EnvironmentInformation; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -139,7 +138,6 @@ class InterruptSensitiveRestoreTest { IN_RESTORE_LATCH.reset(); Configuration taskConfig = new Configuration(); StreamConfig cfg = new StreamConfig(taskConfig); - cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); switch (mode) { case OPERATOR_MANAGED: case OPERATOR_RAW: diff --git a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java index 4f6791e3268..2d93e3375f3 100644 --- a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java +++ b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java @@ -56,7 +56,6 @@ import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider; import org.apache.flink.runtime.state.ttl.TtlTimeProvider; import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest; @@ -793,14 +792,6 @@ public class AbstractStreamOperatorTestHarness<OUT> implements AutoCloseable { return processingTimeService.getCurrentProcessingTime(); } - public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) { - this.config.setTimeCharacteristic(timeCharacteristic); - } - - public TimeCharacteristic getTimeCharacteristic() { - return this.config.getTimeCharacteristic(); - } - public boolean wasFailedExternally() { return wasFailedExternally; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java index 89a1104a3b6..86d7870b24e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java @@ -32,7 +32,6 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironment; import org.apache.flink.runtime.state.StateInitializationContext; import org.apache.flink.runtime.state.StateSnapshotContext; import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; @@ -161,7 +160,6 @@ class AbstractUdfStreamOperatorLifecycleTest { cfg.setStreamOperator(new LifecycleTrackingStreamSource<>(srcFun, true)); cfg.setOperatorID(new OperatorID()); - cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); try (ShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) { Task task = @@ -192,7 +190,6 @@ class AbstractUdfStreamOperatorLifecycleTest { MockSourceFunction srcFun = new MockSourceFunction(); cfg.setStreamOperator(new LifecycleTrackingStreamSource<>(srcFun, false)); cfg.setOperatorID(new OperatorID()); - cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); try (ShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) { Task task = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java index eafe91b5940..93d526a4d04 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java @@ -18,44 +18,23 @@ package org.apache.flink.streaming.runtime.operators; -import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.api.common.typeinfo.TypeInformation; -import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.execution.CancelTaskException; -import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.jobgraph.OperatorID; -import org.apache.flink.runtime.operators.testutils.DummyEnvironment; -import org.apache.flink.runtime.state.hashmap.HashMapStateBackend; -import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; -import org.apache.flink.streaming.api.operators.Output; import org.apache.flink.streaming.api.operators.StreamSource; -import org.apache.flink.streaming.api.operators.StreamSourceContexts; import org.apache.flink.streaming.api.watermark.Watermark; -import org.apache.flink.streaming.runtime.streamrecord.StreamElement; -import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; import org.apache.flink.streaming.runtime.tasks.SourceStreamTask; import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness; -import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService; -import org.apache.flink.streaming.runtime.tasks.TimerService; -import org.apache.flink.streaming.util.CollectorOutput; -import org.apache.flink.streaming.util.MockStreamTask; -import org.apache.flink.streaming.util.MockStreamTaskBuilder; import org.apache.flink.util.ExceptionUtils; import org.junit.jupiter.api.Test; -import java.util.ArrayList; -import java.util.List; - -import static org.apache.flink.streaming.api.operators.StreamOperatorUtils.setupStreamOperator; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.mockito.Mockito.mock; /** Tests for {@link StreamSource} operators. */ @SuppressWarnings("serial") @@ -126,86 +105,8 @@ class StreamSourceOperatorWatermarksTest { assertThat(testHarness.getOutput()).isEmpty(); } - @Test - void testAutomaticWatermarkContext() throws Exception { - - // regular stream source operator - final StreamSource<String, InfiniteSource<String>> operator = - new StreamSource<>(new InfiniteSource<>()); - - long watermarkInterval = 10; - TestProcessingTimeService processingTimeService = new TestProcessingTimeService(); - processingTimeService.setCurrentTime(0); - - MockStreamTask<?, ?> task = - setupSourceOperator( - operator, - TimeCharacteristic.IngestionTime, - watermarkInterval, - processingTimeService); - - final List<StreamElement> output = new ArrayList<>(); - - StreamSourceContexts.getSourceContext( - TimeCharacteristic.IngestionTime, - processingTimeService, - task.getCheckpointLock(), - new CollectorOutput<String>(output), - operator.getExecutionConfig().getAutoWatermarkInterval(), - -1, - true); - - // periodically emit the watermarks - // even though we start from 1 the watermark are still - // going to be aligned with the watermark interval. - - for (long i = 1; i < 100; i += watermarkInterval) { - processingTimeService.setCurrentTime(i); - } - - assertThat(output).hasSize(9); - - long nextWatermark = 0; - for (StreamElement el : output) { - nextWatermark += watermarkInterval; - Watermark wm = (Watermark) el; - assertThat(wm.getTimestamp()).isEqualTo(nextWatermark); - } - } - // ------------------------------------------------------------------------ - @SuppressWarnings("unchecked") - private static <T> MockStreamTask setupSourceOperator( - StreamSource<T, ?> operator, - TimeCharacteristic timeChar, - long watermarkInterval, - final TimerService timeProvider) - throws Exception { - - ExecutionConfig executionConfig = new ExecutionConfig(); - executionConfig.setAutoWatermarkInterval(watermarkInterval); - - StreamConfig cfg = new StreamConfig(new Configuration()); - cfg.setStateBackend(new HashMapStateBackend()); - cfg.setCheckpointStorage(new JobManagerCheckpointStorage()); - - cfg.setTimeCharacteristic(timeChar); - cfg.setOperatorID(new OperatorID()); - - Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0); - - MockStreamTask mockTask = - new MockStreamTaskBuilder(env) - .setConfig(cfg) - .setExecutionConfig(executionConfig) - .setTimerService(timeProvider) - .build(); - - setupStreamOperator(operator, mockTask, cfg, (Output<StreamRecord<T>>) mock(Output.class)); - return mockTask; - } - private static <T> StreamTaskTestHarness<T> setupSourceStreamTask( StreamSource<T, ?> sourceOperator, TypeInformation<T> outputType) { @@ -236,7 +137,6 @@ class StreamSourceOperatorWatermarksTest { StreamConfig streamConfig = testHarness.getStreamConfig(); streamConfig.setStreamOperator(sourceOperator); streamConfig.setOperatorID(new OperatorID()); - streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); return testHarness; } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java index 1491f4142fe..822947e71e4 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java @@ -50,7 +50,6 @@ import org.apache.flink.runtime.operators.testutils.ExpectedTestException; import org.apache.flink.runtime.state.CheckpointStorageLocationReference; import org.apache.flink.runtime.taskmanager.CheckpointResponder; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.ListCheckpointed; import org.apache.flink.streaming.api.functions.source.legacy.FromElementsFunction; import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; @@ -236,7 +235,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase { .finish(); StreamConfig streamConfig = testHarness.getStreamConfig(); - streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.invoke(); testHarness.waitForTaskCompletion(); @@ -275,7 +273,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase { .finish(); StreamConfig streamConfig = testHarness.getStreamConfig(); - streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); ConcurrentLinkedQueue<Object> expectedOutput = new ConcurrentLinkedQueue<>(); @@ -340,7 +337,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase { .finish(); StreamConfig streamConfig = testHarness.getStreamConfig(); - streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); testHarness.invoke(); CancelLockingSource.awaitRunning(); @@ -452,9 +448,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase { STRING_TYPE_INFO.createSerializer(new SerializerConfigImpl())) .finish(); - StreamConfig streamConfig = testHarness.getStreamConfig(); - streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); - testHarness.invoke(); try { testHarness.waitForTaskCompletion(); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java index e16b2f73f49..f5098c5b971 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java @@ -45,7 +45,6 @@ import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.taskmanager.TestCheckpointResponder; import org.apache.flink.runtime.throughput.BufferDebloatConfiguration; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.NonChainedOutput; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig; @@ -117,7 +116,6 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> { TypeInformation<OUT> outputType) { this.taskFactory = checkNotNull(taskFactory); outputSerializer = outputType.createSerializer(executionConfig.getSerializerConfig()); - streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); } public <T> StreamTaskMailboxTestHarnessBuilder<OUT> modifyExecutionConfig( @@ -381,7 +379,6 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> { ResultPartitionType.PIPELINED_BOUNDED)); StreamConfig sourceConfig = new StreamConfig(new Configuration()); - sourceConfig.setTimeCharacteristic(streamConfig.getTimeCharacteristic()); sourceConfig.setVertexNonChainedOutputs(streamOutputsInOrder); sourceConfig.setChainedOutputs(chainedOutputs); sourceConfig.setTypeSerializerOut(sourceInput.getSourceSerializer()); diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java index 94430105a72..5f254a9ae57 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java @@ -58,7 +58,6 @@ import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.operators.AbstractStreamOperator; @@ -159,7 +158,6 @@ class StreamTaskSystemExitTest { final StreamConfig streamConfig = new StreamConfig(taskConfiguration); streamConfig.setOperatorID(new OperatorID()); streamConfig.setStreamOperator(operator); - streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); // for source run streamConfig.serializeAllConfigs(); final JobInformation jobInformation = diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java index 33a339c5d51..9593fa85418 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java @@ -114,7 +114,6 @@ import org.apache.flink.runtime.taskmanager.TestTaskBuilder; import org.apache.flink.runtime.testutils.ExceptionallyDoneFuture; import org.apache.flink.runtime.throughput.ThroughputCalculator; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction; import org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; @@ -478,7 +477,6 @@ public class StreamTaskTest { final StreamConfig cfg = new StreamConfig(new Configuration()); cfg.setOperatorID(new OperatorID(4711L, 42L)); cfg.setStreamOperator(new SlowlyDeserializingOperator()); - cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); cfg.serializeAllConfigs(); final TaskManagerActions taskManagerActions = spy(new NoOpTaskManagerActions()); @@ -520,7 +518,6 @@ public class StreamTaskTest { TestStreamSource<Long, MockSourceFunction> streamSource = new TestStreamSource<>(new MockSourceFunction()); cfg.setStreamOperator(streamSource); - cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); try (ShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) { Task task = @@ -561,7 +558,6 @@ public class StreamTaskTest { TestStreamSource<Long, MockSourceFunction> streamSource = new TestStreamSource<>(new MockSourceFunction()); cfg.setStreamOperator(streamSource); - cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) { @@ -1532,7 +1528,6 @@ public class StreamTaskTest { FailedSource failedSource = new FailedSource(); cfg.setStreamOperator(new TestStreamSource<String, FailedSource>(failedSource)); - cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); try (NettyShuffleEnvironment shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build()) { diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java index cc5597cca10..d4e072cfe8e 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java @@ -51,7 +51,6 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig; import org.apache.flink.runtime.state.TestTaskStateManager; import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo; import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.graph.NonChainedOutput; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamNode; @@ -227,7 +226,6 @@ public class StreamTaskTestHarness<OUT> { Preconditions.checkState(!setupCalled, "This harness was already setup."); setupCalled = true; streamConfig.setChainStart(); - streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime); streamConfig.setNumberOfOutputs(1); streamConfig.setTypeSerializerOut(outputSerializer); streamConfig.setVertexID(0); diff --git a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java index c69c25983ff..debf4c4cbaf 100644 --- a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java @@ -24,7 +24,6 @@ import org.apache.flink.api.dag.Transformation; import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.api.java.typeutils.TupleTypeInfo; import org.apache.flink.api.java.typeutils.TypeExtractor; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.table.api.Schema; @@ -298,13 +297,11 @@ public abstract class AbstractStreamTableEnvironmentImpl extends TableEnvironmen } protected void validateTimeCharacteristic(boolean isRowtimeDefined) { - if (isRowtimeDefined - && executionEnvironment.getStreamTimeCharacteristic() - != TimeCharacteristic.EventTime) { + if (isRowtimeDefined && executionEnvironment.getConfig().getAutoWatermarkInterval() <= 0) { throw new ValidationException( String.format( - "A rowtime attribute requires an EventTime time characteristic in stream environment. But is: %s", - executionEnvironment.getStreamTimeCharacteristic())); + "A rowtime attribute requires a positive watermark interval in stream environment. But is: %s", + executionEnvironment.getConfig().getAutoWatermarkInterval())); } } diff --git a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java index cb65d4ea7fb..89aa1238d22 100644 --- a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java +++ b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java @@ -52,12 +52,10 @@ import org.apache.flink.table.functions.AggregateFunction; import org.apache.flink.table.functions.TableAggregateFunction; import org.apache.flink.table.functions.TableFunction; import org.apache.flink.table.functions.UserDefinedFunctionHelper; -import org.apache.flink.table.legacy.sources.TableSource; import org.apache.flink.table.module.ModuleManager; import org.apache.flink.table.operations.ExternalQueryOperation; import org.apache.flink.table.operations.OutputConversionModifyOperation; import org.apache.flink.table.resource.ResourceManager; -import org.apache.flink.table.sources.TableSourceValidation; import org.apache.flink.table.types.AbstractDataType; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.utils.TypeConversions; @@ -392,10 +390,4 @@ public final class StreamTableEnvironmentImpl extends AbstractStreamTableEnviron OutputConversionModifyOperation.UpdateMode.RETRACT); return toStreamInternal(table, modifyOperation); } - - @Override - protected void validateTableSource(TableSource<?> tableSource) { - super.validateTableSource(tableSource); - validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource)); - } } diff --git a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala index 3a90a0f650d..5c3e2c556c4 100644 --- a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala +++ b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.api.bridge.scala.internal import org.apache.flink.annotation.Internal import org.apache.flink.api.common.typeinfo.TypeInformation -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.table.api._ @@ -265,22 +264,6 @@ class StreamTableEnvironmentImpl( ) } - override protected def validateTableSource(tableSource: TableSource[_]): Unit = { - super.validateTableSource(tableSource) - // check that event-time is enabled if table source includes rowtime attributes - if ( - TableSourceValidation.hasRowtimeAttribute(tableSource) && - executionEnvironment.getStreamTimeCharacteristic != TimeCharacteristic.EventTime - ) { - throw new TableException( - String.format( - "A rowtime attribute requires an EventTime time characteristic in stream " + - "environment. But is: %s}", - executionEnvironment.getStreamTimeCharacteristic - )) - } - } - override def createTemporaryView[T]( path: String, dataStream: DataStream[T], diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java index 422323fcf8a..a6b3a324281 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java @@ -27,7 +27,6 @@ import org.apache.flink.configuration.ReadableConfig; import org.apache.flink.core.execution.CheckpointingMode; import org.apache.flink.legacy.table.sinks.StreamTableSink; import org.apache.flink.legacy.table.sources.StreamTableSource; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.environment.CheckpointConfig; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.graph.StreamGraph; @@ -174,17 +173,6 @@ public class DummyStreamExecutionEnvironment extends StreamExecutionEnvironment return realExecEnv.getCheckpointingConsistencyMode(); } - @Override - public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) { - throw new UnsupportedOperationException( - "This is a dummy StreamExecutionEnvironment, setStreamTimeCharacteristic method is unsupported."); - } - - @Override - public TimeCharacteristic getStreamTimeCharacteristic() { - return realExecEnv.getStreamTimeCharacteristic(); - } - @Override public JobExecutionResult execute() throws Exception { throw new UnsupportedOperationException( diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala index 514e2f247b2..339f8a4aa15 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala @@ -19,7 +19,6 @@ package org.apache.flink.table.planner.runtime.stream.sql import org.apache.flink.api.common.typeinfo.TypeInformation import org.apache.flink.api.java.typeutils.RowTypeInfo -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.datastream.DataStream import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ @@ -181,10 +180,6 @@ class AggregateITCase(aggMode: AggMode, miniBatch: MiniBatchMode, backend: State @TestTemplate def testAggregationWithoutWatermark(): Unit = { - // NOTE: Different from AggregateITCase, we do not set stream time characteristic - // of environment to event time, so that emitWatermark() actually does nothing. - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - val data = new mutable.MutableList[(Int, Int)] data.+=((1, 1)) data.+=((2, 2)) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala index 1a775b511bd..e55fa33070b 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala @@ -17,7 +17,6 @@ */ package org.apache.flink.table.planner.runtime.stream.sql -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.scala._ import org.apache.flink.table.api.config.ExecutionConfigOptions @@ -1378,10 +1377,6 @@ class JoinITCase(miniBatch: MiniBatchMode, state: StateBackendMode, enableAsyncS @TestTemplate def testJoinWithoutWatermark(): Unit = { - // NOTE: Different from AggregateITCase, we do not set stream time characteristic - // of environment to event time, so that emitWatermark() actually does nothing. - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) - val data1 = new mutable.MutableList[(Int, Long)] data1.+=((1, 1L)) data1.+=((2, 2L)) diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala index a8fe68f1d78..07343f4af68 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala @@ -17,7 +17,6 @@ */ package org.apache.flink.table.planner.runtime.stream.sql -import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor import org.apache.flink.table.api._ @@ -50,7 +49,6 @@ class TemporalTableFunctionJoinITCase(state: StateBackendMode) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val sqlQuery = """ @@ -99,7 +97,6 @@ class TemporalTableFunctionJoinITCase(state: StateBackendMode) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val result = tEnv .sqlQuery( "SELECT amount, currency, proctime() as proctime " + @@ -114,7 +111,6 @@ class TemporalTableFunctionJoinITCase(state: StateBackendMode) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime) val sqlQuery = """ @@ -170,7 +166,6 @@ class TemporalTableFunctionJoinITCase(state: StateBackendMode) val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) env.setParallelism(1) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ @@ -234,7 +229,6 @@ class TemporalTableFunctionJoinITCase(state: StateBackendMode) def testNestedTemporalJoin(): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING) - env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val sqlQuery = """ diff --git a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala index 8aa8165012d..832ab36255a 100644 --- a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala +++ b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala @@ -27,8 +27,8 @@ import org.apache.flink.legacy.table.sinks.{AppendStreamTableSink, RetractStream import org.apache.flink.legacy.table.sources.StreamTableSource import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode -import org.apache.flink.streaming.api.{environment, TimeCharacteristic} import org.apache.flink.streaming.api.datastream.DataStream +import org.apache.flink.streaming.api.environment import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, StreamExecutionEnvironment} import org.apache.flink.table.api._ import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => JavaStreamTableEnv} @@ -1717,17 +1717,6 @@ object TableTestUtil { .map( (f: Array[Expression]) => { val fieldsInfo = FieldInfoUtils.getFieldsInfo(streamType, f) - // check if event-time is enabled - if ( - fieldsInfo.isRowtimeDefined && - (execEnv.getStreamTimeCharacteristic ne TimeCharacteristic.EventTime) - ) { - throw new ValidationException( - String.format( - "A rowtime attribute requires an EventTime time characteristic in stream " + - "environment. But is: %s", - execEnv.getStreamTimeCharacteristic)) - } fieldsInfo }) .getOrElse(FieldInfoUtils.getFieldsInfo(streamType)) diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java index 1f20ea83b0a..7853915e9eb 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java @@ -21,7 +21,6 @@ package org.apache.flink.test.streaming.runtime; import org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.runtime.client.JobExecutionException; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction; @@ -36,11 +35,7 @@ import org.apache.flink.util.ExceptionUtils; import org.junit.Assert; import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.Parameterized; -import java.util.Arrays; -import java.util.Collection; import java.util.Optional; import java.util.concurrent.Semaphore; @@ -52,15 +47,8 @@ import static org.junit.Assert.assertTrue; * <p>These tests ensure that exceptions are properly forwarded from the timer thread to the task * thread and that operator methods are not invoked concurrently. */ -@RunWith(Parameterized.class) public class StreamTaskTimerITCase extends AbstractTestBaseJUnit4 { - private final TimeCharacteristic timeCharacteristic; - - public StreamTaskTimerITCase(TimeCharacteristic characteristic) { - timeCharacteristic = characteristic; - } - /** * Note: this test fails if we don't check for exceptions in the source contexts and do not * synchronize in the source contexts. @@ -69,7 +57,6 @@ public class StreamTaskTimerITCase extends AbstractTestBaseJUnit4 { public void testOperatorChainedToSource() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(timeCharacteristic); env.setParallelism(1); DataStream<String> source = env.addSource(new InfiniteTestSource()); @@ -106,7 +93,6 @@ public class StreamTaskTimerITCase extends AbstractTestBaseJUnit4 { @Test public void testOneInputOperatorWithoutChaining() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(timeCharacteristic); env.setParallelism(1); DataStream<String> source = env.addSource(new InfiniteTestSource()); @@ -123,7 +109,6 @@ public class StreamTaskTimerITCase extends AbstractTestBaseJUnit4 { @Test public void testTwoInputOperatorWithoutChaining() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - env.setStreamTimeCharacteristic(timeCharacteristic); env.setParallelism(1); DataStream<String> source = env.addSource(new InfiniteTestSource()); @@ -290,16 +275,4 @@ public class StreamTaskTimerITCase extends AbstractTestBaseJUnit4 { running = false; } } - - // ------------------------------------------------------------------------ - // parametrization - // ------------------------------------------------------------------------ - - @Parameterized.Parameters(name = "Time Characteristic = {0}") - public static Collection<Object[]> executionModes() { - return Arrays.asList( - new Object[] {TimeCharacteristic.ProcessingTime}, - new Object[] {TimeCharacteristic.IngestionTime}, - new Object[] {TimeCharacteristic.EventTime}); - } } diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java index cb4799343f9..14238407b46 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java @@ -42,7 +42,6 @@ import org.apache.flink.runtime.checkpoint.CheckpointException; import org.apache.flink.runtime.checkpoint.CheckpointFailureReason; import org.apache.flink.runtime.client.JobStatusMessage; import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration; -import org.apache.flink.streaming.api.TimeCharacteristic; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org.apache.flink.streaming.api.functions.co.CoMapFunction; @@ -666,38 +665,11 @@ public class TimestampITCase extends TestLogger { CustomOperator.finalWatermarks[0].get(0).getTimestamp() == Long.MAX_VALUE); } - /** - * This verifies that an event time source works when setting stream time characteristic to - * processing time. In this case, the watermarks should just be swallowed apart from the last - * final watermark marking the end of time. - */ - @Test - public void testEventTimeSourceWithProcessingTime() throws Exception { - StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); - - env.setParallelism(2); - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); - - DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 10)); - - source1.map(new IdentityMap()) - .transform( - "Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new CustomOperator(false)); - - env.execute(); - - // verify that we don't get any watermarks, the source is used as watermark source in - // other tests, so it normally emits watermarks - Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1); - Assert.assertEquals(Watermark.MAX_WATERMARK, CustomOperator.finalWatermarks[0].get(0)); - } - @Test public void testErrorOnEventTimeOverProcessingTime() { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(2); - env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime); DataStream<Tuple2<String, Integer>> source1 = env.fromData(new Tuple2<>("a", 1), new Tuple2<>("b", 2));
