This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit d7c87f23fa32addfbc2ae21dd9645dd29f5684b8
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Mon Sep 30 15:49:38 2024 +0800

    [FLINK-36355][runtime] Remove deprecated TimeCharacteristic
---
 .../file/table/stream/StreamingFileWriterTest.java |   2 -
 .../flink/api/common/state/StateTtlConfig.java     |   5 +-
 .../datastream/impl/ExecutionEnvironmentImpl.java  |   7 +-
 .../python/datastream/data_stream_job.py           |   3 +-
 .../ContinuousFileProcessingMigrationTest.java     |   3 -
 .../hdfstests/ContinuousFileProcessingTest.java    | 191 ---------------------
 .../stream_execution_environment.rst               |   2 -
 .../docs/reference/pyflink.datastream/timer.rst    |  11 --
 flink-python/pyflink/datastream/__init__.py        |   5 -
 flink-python/pyflink/datastream/functions.py       |  15 +-
 .../datastream/stream_execution_environment.py     |  35 ----
 .../pyflink/datastream/tests/test_data_stream.py   |   6 +-
 .../pyflink/datastream/time_characteristic.py      |  95 ----------
 .../flink/streaming/api/TimeCharacteristic.java    |  98 -----------
 .../environment/StreamExecutionEnvironment.java    |  54 ------
 .../api/functions/KeyedProcessFunction.java        |   3 +-
 .../streaming/api/functions/ProcessFunction.java   |   3 +-
 .../functions/co/BaseBroadcastProcessFunction.java |   3 +-
 .../api/functions/co/CoProcessFunction.java        |   3 +-
 .../api/functions/co/KeyedCoProcessFunction.java   |   3 +-
 .../source/ContinuousFileReaderOperator.java       |   1 -
 .../flink/streaming/api/graph/StreamConfig.java    |  18 --
 .../flink/streaming/api/graph/StreamGraph.java     |  11 --
 .../streaming/api/graph/StreamGraphGenerator.java  |  12 --
 .../api/graph/StreamingJobGraphGenerator.java      |   2 -
 .../streaming/api/operators/StreamSource.java      |   4 -
 .../api/operators/StreamSourceContexts.java        |  51 +-----
 .../flink/streaming/runtime/tasks/StreamTask.java  |   5 +-
 .../source/ContinuousFileReaderOperatorTest.java   |   3 -
 .../StreamSourceContextIdleDetectionTests.java     | 173 -------------------
 .../ContinuousFileProcessingRescalingTest.java     |   2 -
 .../StreamSourceOperatorLatencyMetricsTest.java    |   2 -
 .../windowing/TimeWindowTranslationTest.java       |   4 -
 .../tasks/InterruptSensitiveRestoreTest.java       |   2 -
 .../util/AbstractStreamOperatorTestHarness.java    |   9 -
 .../AbstractUdfStreamOperatorLifecycleTest.java    |   3 -
 .../StreamSourceOperatorWatermarksTest.java        | 100 -----------
 .../runtime/tasks/SourceStreamTaskTest.java        |   7 -
 .../tasks/StreamTaskMailboxTestHarnessBuilder.java |   3 -
 .../runtime/tasks/StreamTaskSystemExitTest.java    |   2 -
 .../streaming/runtime/tasks/StreamTaskTest.java    |   5 -
 .../runtime/tasks/StreamTaskTestHarness.java       |   2 -
 .../AbstractStreamTableEnvironmentImpl.java        |   9 +-
 .../java/internal/StreamTableEnvironmentImpl.java  |   8 -
 .../internal/StreamTableEnvironmentImpl.scala      |  17 --
 .../utils/DummyStreamExecutionEnvironment.java     |  12 --
 .../runtime/stream/sql/AggregateITCase.scala       |   5 -
 .../planner/runtime/stream/sql/JoinITCase.scala    |   5 -
 .../sql/TemporalTableFunctionJoinITCase.scala      |   6 -
 .../flink/table/planner/utils/TableTestBase.scala  |  13 +-
 .../streaming/runtime/StreamTaskTimerITCase.java   |  27 ---
 .../test/streaming/runtime/TimestampITCase.java    |  28 ---
 52 files changed, 29 insertions(+), 1069 deletions(-)

diff --git 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
index 935dff6f780..22cd3f6447a 100644
--- 
a/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
+++ 
b/flink-connectors/flink-connector-files/src/test/java/org/apache/flink/connector/file/table/stream/StreamingFileWriterTest.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.connector.file.table.FileSystemTableSink;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
 import 
org.apache.flink.streaming.api.functions.sink.filesystem.OutputFileConfig;
 import org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy;
@@ -375,7 +374,6 @@ class StreamingFileWriterTest {
                         conf);
         OneInputStreamOperatorTestHarness<RowData, PartitionCommitInfo> 
harness =
                 new OneInputStreamOperatorTestHarness<>(writer, 1, 1, 0);
-        
harness.getStreamConfig().setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
         return harness;
     }
 
diff --git 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
index 0ba3ea00e1f..61f0f4eccfe 100644
--- 
a/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
+++ 
b/flink-core/src/main/java/org/apache/flink/api/common/state/StateTtlConfig.java
@@ -84,10 +84,7 @@ public class StateTtlConfig implements Serializable {
     /** This option configures time scale to use for ttl. */
     @PublicEvolving
     public enum TtlTimeCharacteristic {
-        /**
-         * Processing time, see also <code>
-         * 
org.apache.flink.streaming.api.TimeCharacteristic.ProcessingTime</code>.
-         */
+        /** Processing time. */
         ProcessingTime
     }
 
diff --git 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java
 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java
index fd68cf872f9..b9a1fe045fb 100644
--- 
a/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java
+++ 
b/flink-datastream/src/main/java/org/apache/flink/datastream/impl/ExecutionEnvironmentImpl.java
@@ -59,7 +59,6 @@ import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 
-import static 
org.apache.flink.streaming.api.graph.StreamGraphGenerator.DEFAULT_TIME_CHARACTERISTIC;
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -328,11 +327,7 @@ public class ExecutionEnvironmentImpl implements 
ExecutionEnvironment {
         // We copy the transformation so that newly added transformations 
cannot intervene with the
         // stream graph generation.
         return new StreamGraphGenerator(
-                        new ArrayList<>(transformations),
-                        executionConfig,
-                        checkpointCfg,
-                        configuration)
-                .setTimeCharacteristic(DEFAULT_TIME_CHARACTERISTIC);
+                new ArrayList<>(transformations), executionConfig, 
checkpointCfg, configuration);
     }
 
     private PipelineExecutor getPipelineExecutor() throws Exception {
diff --git 
a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py 
b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
index 880c71e3145..5c85a5756af 100644
--- 
a/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
+++ 
b/flink-end-to-end-tests/flink-python-test/python/datastream/data_stream_job.py
@@ -22,7 +22,7 @@ from pyflink.common import Duration
 from pyflink.common.serialization import SimpleStringSchema
 from pyflink.common.typeinfo import Types
 from pyflink.common.watermark_strategy import TimestampAssigner, 
WatermarkStrategy
-from pyflink.datastream import StreamExecutionEnvironment, TimeCharacteristic
+from pyflink.datastream import StreamExecutionEnvironment
 from pyflink.datastream.connectors import FlinkKafkaProducer, 
FlinkKafkaConsumer
 from pyflink.datastream.formats.json import JsonRowDeserializationSchema
 from pyflink.datastream.functions import KeyedProcessFunction
@@ -36,7 +36,6 @@ def python_data_stream_example():
     # are processed by the same worker and the collected result would be in 
order which is good for
     # assertion.
     env.set_parallelism(1)
-    env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
 
     type_info = Types.ROW_NAMED(['createTime', 'orderId', 'payAmount', 
'payPlatform', 'provinceId'],
                                 [Types.LONG(), Types.LONG(), Types.DOUBLE(), 
Types.INT(),
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
index 2394c3e1826..6505ca96d3f 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingMigrationTest.java
@@ -28,7 +28,6 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
@@ -117,7 +116,6 @@ public class ContinuousFileProcessingMigrationTest 
implements MigrationTest {
 
         OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> testHarness =
                 createHarness(format);
-        testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
         testHarness.open();
         // create some state in the reader
         testHarness.processElement(new StreamRecord<>(split1));
@@ -152,7 +150,6 @@ public class ContinuousFileProcessingMigrationTest 
implements MigrationTest {
 
         OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit> testHarness =
                 createHarness(format);
-        testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
         testHarness.setup();
 
diff --git 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
index d32d63bc06e..92ceed2322c 100644
--- 
a/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
+++ 
b/flink-fs-tests/src/test/java/org/apache/flink/hdfstests/ContinuousFileProcessingTest.java
@@ -30,7 +30,6 @@ import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperator;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
 import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
@@ -41,15 +40,11 @@ import 
org.apache.flink.streaming.api.legacy.io.TextInputFormat;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor;
-import org.apache.flink.streaming.runtime.tasks.mailbox.MailboxDefaultAction;
-import 
org.apache.flink.streaming.runtime.tasks.mailbox.SteppingMailboxProcessor;
 import org.apache.flink.streaming.util.AbstractStreamOperatorTestHarness;
 import org.apache.flink.streaming.util.MockStreamingRuntimeContext;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
 import org.apache.flink.util.OperatingSystem;
 import org.apache.flink.util.Preconditions;
-import org.apache.flink.util.function.RunnableWithException;
 
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -75,7 +70,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.UUID;
-import java.util.concurrent.ConcurrentLinkedQueue;
 
 /**
  * Tests for the {@link ContinuousFileMonitoringFunction} and {@link 
ContinuousFileReaderOperator}.
@@ -166,180 +160,6 @@ public class ContinuousFileProcessingTest {
         }
     }
 
-    @Test
-    public void testFileReadingOperatorWithIngestionTime() throws Exception {
-        String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
-
-        Set<org.apache.hadoop.fs.Path> filesCreated = new HashSet<>();
-        Map<Integer, String> expectedFileContents = new HashMap<>();
-        Map<String, Long> modTimes = new HashMap<>();
-        for (int i = 0; i < NO_OF_FILES; i++) {
-            Tuple2<org.apache.hadoop.fs.Path, String> file =
-                    createFileAndFillWithData(testBasePath, "file", i, "This 
is test line.");
-            filesCreated.add(file.f0);
-            modTimes.put(file.f0.getName(), 
hdfs.getFileStatus(file.f0).getModificationTime());
-            expectedFileContents.put(i, file.f1);
-        }
-
-        TextInputFormat format = new TextInputFormat(new Path(testBasePath));
-
-        final long watermarkInterval = 10;
-
-        final OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> tester =
-                createHarness(format);
-        SteppingMailboxProcessor localMailbox = createLocalMailbox(tester);
-
-        
tester.getExecutionConfig().setAutoWatermarkInterval(watermarkInterval);
-        tester.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
-
-        tester.open();
-        Assert.assertEquals(TimeCharacteristic.IngestionTime, 
tester.getTimeCharacteristic());
-
-        tester.setProcessingTime(201);
-
-        // test that watermarks are correctly emitted
-        ConcurrentLinkedQueue<Object> output = tester.getOutput();
-        while (output.isEmpty()) {
-            localMailbox.runMailboxStep();
-        }
-        Assert.assertTrue(output.toString(), output.peek() instanceof 
Watermark);
-        Assert.assertEquals(200, ((Watermark) output.poll()).getTimestamp());
-
-        tester.setProcessingTime(301);
-        Assert.assertTrue(output.peek() instanceof Watermark);
-        Assert.assertEquals(300, ((Watermark) output.poll()).getTimestamp());
-
-        tester.setProcessingTime(401);
-        Assert.assertTrue(output.peek() instanceof Watermark);
-        Assert.assertEquals(400, ((Watermark) output.poll()).getTimestamp());
-
-        tester.setProcessingTime(501);
-        Assert.assertTrue(output.peek() instanceof Watermark);
-        Assert.assertEquals(500, ((Watermark) output.poll()).getTimestamp());
-
-        Assert.assertTrue(output.isEmpty());
-
-        // create the necessary splits for the test
-        FileInputSplit[] splits =
-                
format.createInputSplits(tester.getExecutionConfig().getParallelism());
-
-        // and feed them to the operator
-        Map<Integer, List<String>> actualFileContents = new HashMap<>();
-
-        long lastSeenWatermark = Long.MIN_VALUE;
-        int lineCounter = 0; // counter for the lines read from the splits
-        int watermarkCounter = 0;
-
-        for (FileInputSplit split : splits) {
-
-            // set the next "current processing time".
-            long nextTimestamp = tester.getProcessingTime() + 
watermarkInterval;
-            tester.setProcessingTime(nextTimestamp);
-
-            // send the next split to be read and wait until it is fully read, 
the +1 is for the
-            // watermark.
-            RunnableWithException runnableWithException =
-                    () ->
-                            tester.processElement(
-                                    new StreamRecord<>(
-                                            new TimestampedFileInputSplit(
-                                                    
modTimes.get(split.getPath().getName()),
-                                                    split.getSplitNumber(),
-                                                    split.getPath(),
-                                                    split.getStart(),
-                                                    split.getLength(),
-                                                    split.getHostnames())));
-            runnableWithException.run();
-
-            // NOTE: the following check works because each file fits in one 
split.
-            // In other case it would fail and wait forever.
-            // BUT THIS IS JUST FOR THIS TEST
-            while (tester.getOutput().isEmpty()
-                    || tester.getOutput().size() != (LINES_PER_FILE + 1)) {
-                localMailbox.runMailboxStep();
-            }
-
-            // verify that the results are the expected
-            for (Object line : tester.getOutput()) {
-
-                if (line instanceof StreamRecord) {
-
-                    @SuppressWarnings("unchecked")
-                    StreamRecord<String> element = (StreamRecord<String>) line;
-                    lineCounter++;
-
-                    Assert.assertEquals(nextTimestamp, element.getTimestamp());
-
-                    int fileIdx = 
Character.getNumericValue(element.getValue().charAt(0));
-                    List<String> content = actualFileContents.get(fileIdx);
-                    if (content == null) {
-                        content = new ArrayList<>();
-                        actualFileContents.put(fileIdx, content);
-                    }
-                    content.add(element.getValue() + "\n");
-                } else if (line instanceof Watermark) {
-                    long watermark = ((Watermark) line).getTimestamp();
-
-                    Assert.assertEquals(
-                            nextTimestamp - (nextTimestamp % 
watermarkInterval), watermark);
-                    Assert.assertTrue(watermark > lastSeenWatermark);
-                    watermarkCounter++;
-
-                    lastSeenWatermark = watermark;
-                } else {
-                    Assert.fail("Unknown element in the list.");
-                }
-            }
-
-            // clean the output to be ready for the next split
-            tester.getOutput().clear();
-        }
-
-        // now we are processing one split after the other,
-        // so all the elements must be here by now.
-        Assert.assertEquals(NO_OF_FILES * LINES_PER_FILE, lineCounter);
-
-        // because we expect one watermark per split.
-        Assert.assertEquals(splits.length, watermarkCounter);
-
-        // then close the reader gracefully so that the Long.MAX watermark is 
emitted
-        synchronized (tester.getCheckpointLock()) {
-            tester.close();
-        }
-
-        for (org.apache.hadoop.fs.Path file : filesCreated) {
-            hdfs.delete(file, false);
-        }
-
-        // check if the last element is the LongMax watermark (by now this 
must be the only element)
-        Assert.assertEquals(1, tester.getOutput().size());
-        Assert.assertTrue(tester.getOutput().peek() instanceof Watermark);
-        Assert.assertEquals(Long.MAX_VALUE, ((Watermark) 
tester.getOutput().poll()).getTimestamp());
-
-        // check if the elements are the expected ones.
-        Assert.assertEquals(expectedFileContents.size(), 
actualFileContents.size());
-        for (Integer fileIdx : expectedFileContents.keySet()) {
-            Assert.assertTrue(
-                    "file" + fileIdx + " not found", 
actualFileContents.keySet().contains(fileIdx));
-
-            List<String> cntnt = actualFileContents.get(fileIdx);
-            Collections.sort(
-                    cntnt,
-                    new Comparator<String>() {
-                        @Override
-                        public int compare(String o1, String o2) {
-                            return getLineNo(o1) - getLineNo(o2);
-                        }
-                    });
-
-            StringBuilder cntntStr = new StringBuilder();
-            for (String line : cntnt) {
-                cntntStr.append(line);
-            }
-            Assert.assertEquals(expectedFileContents.get(fileIdx), 
cntntStr.toString());
-        }
-    }
-
     private <T> OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
T> createHarness(
             FileInputFormat<T> format) throws Exception {
         ExecutionConfig config = new ExecutionConfig();
@@ -350,14 +170,6 @@ public class ContinuousFileProcessingTest {
                         .createSerializer(config.getSerializerConfig()));
     }
 
-    private SteppingMailboxProcessor createLocalMailbox(
-            OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
String> harness) {
-        return new SteppingMailboxProcessor(
-                MailboxDefaultAction.Controller::suspendDefaultAction,
-                harness.getTaskMailbox(),
-                StreamTaskActionExecutor.IMMEDIATE);
-    }
-
     @Test
     public void testFileReadingOperatorWithEventTime() throws Exception {
         String testBasePath = hdfsURI + "/" + UUID.randomUUID() + "/";
@@ -378,7 +190,6 @@ public class ContinuousFileProcessingTest {
 
         OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> 
tester =
                 createHarness(format);
-        tester.setTimeCharacteristic(TimeCharacteristic.EventTime);
         tester.open();
 
         // create the necessary splits for the test
@@ -483,7 +294,6 @@ public class ContinuousFileProcessingTest {
 
         OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit>
                 initTestInstance = createHarness(format);
-        initTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
         initTestInstance.open();
 
         // create some state in the reader
@@ -504,7 +314,6 @@ public class ContinuousFileProcessingTest {
         OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, 
FileInputSplit>
                 restoredTestInstance =
                         createHarness(new BlockingFileInputFormat(latch, new 
Path(testBasePath)));
-        
restoredTestInstance.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
         restoredTestInstance.initializeState(snapshot);
         restoredTestInstance.open();
diff --git 
a/flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst
 
b/flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst
index ea8be1925d3..5680dc50f26 100644
--- 
a/flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst
+++ 
b/flink-python/docs/reference/pyflink.datastream/stream_execution_environment.rst
@@ -55,8 +55,6 @@ access).
     StreamExecutionEnvironment.is_changelog_state_backend_enabled
     StreamExecutionEnvironment.set_default_savepoint_directory
     StreamExecutionEnvironment.get_default_savepoint_directory
-    StreamExecutionEnvironment.set_stream_time_characteristic
-    StreamExecutionEnvironment.get_stream_time_characteristic
     StreamExecutionEnvironment.configure
     StreamExecutionEnvironment.add_python_file
     StreamExecutionEnvironment.set_python_requirements
diff --git a/flink-python/docs/reference/pyflink.datastream/timer.rst 
b/flink-python/docs/reference/pyflink.datastream/timer.rst
index cc1ee50709e..88da53d76ce 100644
--- a/flink-python/docs/reference/pyflink.datastream/timer.rst
+++ b/flink-python/docs/reference/pyflink.datastream/timer.rst
@@ -37,17 +37,6 @@ TimerService
     TimerService.delete_event_time_timer
 
 
-TimeCharacteristic
-------------------
-
-.. currentmodule:: pyflink.datastream.time_characteristic
-
-.. autosummary::
-    :toctree: api/
-
-    TimeCharacteristic
-
-
 TimeDomain
 ----------
 
diff --git a/flink-python/pyflink/datastream/__init__.py 
b/flink-python/pyflink/datastream/__init__.py
index fce9563b7fc..f98fe98dc96 100644
--- a/flink-python/pyflink/datastream/__init__.py
+++ b/flink-python/pyflink/datastream/__init__.py
@@ -243,9 +243,6 @@ Classes to define formats used together with source & sink:
 
 Other important classes:
 
-    - :class:`TimeCharacteristic`:
-      Defines how the system determines time for time-dependent order and 
operations that depend
-      on time (such as time windows).
     - :class:`TimeDomain`:
       Specifies whether a firing timer is based on event time or processing 
time.
     - :class:`KeySelector`:
@@ -280,7 +277,6 @@ from pyflink.datastream.checkpoint_storage import 
(CheckpointStorage, JobManager
                                                    FileSystemCheckpointStorage,
                                                    CustomCheckpointStorage)
 from pyflink.datastream.stream_execution_environment import 
StreamExecutionEnvironment
-from pyflink.datastream.time_characteristic import TimeCharacteristic
 from pyflink.datastream.time_domain import TimeDomain
 from pyflink.datastream.functions import ProcessFunction
 from pyflink.datastream.timerservice import TimerService
@@ -335,7 +331,6 @@ __all__ = [
     'MergingWindowAssigner',
     'TriggerResult',
     'Trigger',
-    'TimeCharacteristic',
     'TimeDomain',
     'KeySelector',
     'Partitioner',
diff --git a/flink-python/pyflink/datastream/functions.py 
b/flink-python/pyflink/datastream/functions.py
index 29f38617f92..69e193fa167 100644
--- a/flink-python/pyflink/datastream/functions.py
+++ b/flink-python/pyflink/datastream/functions.py
@@ -645,8 +645,7 @@ class ProcessFunction(Function):
             """
             Timestamp of the element currently being processed or timestamp of 
a firing timer.
 
-            This might be None, for example if the time characteristic of your 
program is set to
-            TimeCharacteristic.ProcessTime.
+            This might be None, depending on the stream's watermark strategy.
             """
             pass
 
@@ -697,8 +696,7 @@ class KeyedProcessFunction(Function):
             """
             Timestamp of the element currently being processed or timestamp of 
a firing timer.
 
-            This might be None, for example if the time characteristic of your 
program is set to
-            TimeCharacteristic.ProcessTime.
+            This might be None, depending on the stream's watermark strategy.
             """
             pass
 
@@ -772,8 +770,7 @@ class CoProcessFunction(Function):
             """
             Timestamp of the element currently being processed or timestamp of 
a firing timer.
 
-            This might be None, for example if the time characteristic of your 
program is set to
-            TimeCharacteristic.ProcessTime.
+            This might be None, depending on the stream's watermark strategy.
             """
             pass
 
@@ -843,8 +840,7 @@ register a timer that will trigger an action in the future.
             """
             Timestamp of the element currently being processed or timestamp of 
a firing timer.
 
-            This might be None, for example if the time characteristic of your 
program is set to
-            TimeCharacteristic.ProcessTime.
+            This might be None, depending on the stream's watermark strategy.
             """
             pass
 
@@ -1347,8 +1343,7 @@ class BaseBroadcastProcessFunction(Function):
         def timestamp(self) -> int:
             """
             Timestamp of the element currently being processed or timestamp of 
a firing timer.
-            This might be None, for example if the time characteristic of your 
program is
-            set to :attr:`TimeCharacteristic.ProcessingTime`.
+            This might be None, depending on the stream's watermark strategy.
             """
             pass
 
diff --git a/flink-python/pyflink/datastream/stream_execution_environment.py 
b/flink-python/pyflink/datastream/stream_execution_environment.py
index 3e507d1be56..00e639ec379 100644
--- a/flink-python/pyflink/datastream/stream_execution_environment.py
+++ b/flink-python/pyflink/datastream/stream_execution_environment.py
@@ -35,7 +35,6 @@ from pyflink.datastream.connectors import Source
 from pyflink.datastream.data_stream import DataStream
 from pyflink.datastream.execution_mode import RuntimeExecutionMode
 from pyflink.datastream.functions import SourceFunction
-from pyflink.datastream.time_characteristic import TimeCharacteristic
 from pyflink.datastream.utils import ResultTypeQueryable
 from pyflink.java_gateway import get_gateway
 from pyflink.serializers import PickleSerializer
@@ -375,40 +374,6 @@ class StreamExecutionEnvironment(object):
         else:
             return j_path.toString()
 
-    def set_stream_time_characteristic(self, characteristic: 
TimeCharacteristic):
-        """
-        Sets the time characteristic for all streams create from this 
environment, e.g., processing
-        time, event time, or ingestion time.
-
-        If you set the characteristic to IngestionTime of EventTime this will 
set a default
-        watermark update interval of 200 ms. If this is not applicable for 
your application
-        you should change it using
-        :func:`pyflink.common.ExecutionConfig.set_auto_watermark_interval`.
-
-        Example:
-        ::
-
-            >>> 
env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
-
-        :param characteristic: The time characteristic, which could be
-                               :data:`TimeCharacteristic.ProcessingTime`,
-                               :data:`TimeCharacteristic.IngestionTime`,
-                               :data:`TimeCharacteristic.EventTime`.
-        """
-        j_characteristic = 
TimeCharacteristic._to_j_time_characteristic(characteristic)
-        
self._j_stream_execution_environment.setStreamTimeCharacteristic(j_characteristic)
-
-    def get_stream_time_characteristic(self) -> 'TimeCharacteristic':
-        """
-        Gets the time characteristic.
-
-        .. seealso:: :func:`set_stream_time_characteristic`
-
-        :return: The :class:`TimeCharacteristic`.
-        """
-        j_characteristic = 
self._j_stream_execution_environment.getStreamTimeCharacteristic()
-        return TimeCharacteristic._from_j_time_characteristic(j_characteristic)
-
     def configure(self, configuration: Configuration):
         """
         Sets all relevant options contained in the 
:class:`~pyflink.common.Configuration`. such as
diff --git a/flink-python/pyflink/datastream/tests/test_data_stream.py 
b/flink-python/pyflink/datastream/tests/test_data_stream.py
index 8d8de717639..d696c492f41 100644
--- a/flink-python/pyflink/datastream/tests/test_data_stream.py
+++ b/flink-python/pyflink/datastream/tests/test_data_stream.py
@@ -26,8 +26,8 @@ from pyflink.common import Row, Configuration
 from pyflink.common.time import Time
 from pyflink.common.typeinfo import Types
 from pyflink.common.watermark_strategy import WatermarkStrategy, 
TimestampAssigner
-from pyflink.datastream import (TimeCharacteristic, RuntimeContext, 
SlotSharingGroup,
-                                StreamExecutionEnvironment, 
RuntimeExecutionMode)
+from pyflink.datastream import (RuntimeContext, SlotSharingGroup, 
StreamExecutionEnvironment,
+                                RuntimeExecutionMode)
 from pyflink.datastream.data_stream import DataStream
 from pyflink.datastream.functions import (AggregateFunction, CoMapFunction, 
CoFlatMapFunction,
                                           MapFunction, FilterFunction, 
FlatMapFunction,
@@ -122,7 +122,6 @@ class DataStreamTests(object):
 
     def test_keyed_process_function_with_state(self):
         self.env.get_config().set_auto_watermark_interval(2000)
-        self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
         data_stream = self.env.from_collection([(1, 'hi', '1603708211000'),
                                                 (2, 'hello', '1603708224000'),
                                                 (3, 'hi', '1603708226000'),
@@ -1090,7 +1089,6 @@ class ProcessDataStreamTests(DataStreamTests):
     def test_process_function(self):
         self.env.set_parallelism(1)
         self.env.get_config().set_auto_watermark_interval(2000)
-        self.env.set_stream_time_characteristic(TimeCharacteristic.EventTime)
         data_stream = self.env.from_collection([(1, '1603708211000'),
                                                 (2, '1603708224000'),
                                                 (3, '1603708226000'),
diff --git a/flink-python/pyflink/datastream/time_characteristic.py 
b/flink-python/pyflink/datastream/time_characteristic.py
deleted file mode 100644
index 9187b39f382..00000000000
--- a/flink-python/pyflink/datastream/time_characteristic.py
+++ /dev/null
@@ -1,95 +0,0 @@
-################################################################################
-#  Licensed to the Apache Software Foundation (ASF) under one
-#  or more contributor license agreements.  See the NOTICE file
-#  distributed with this work for additional information
-#  regarding copyright ownership.  The ASF licenses this file
-#  to you under the Apache License, Version 2.0 (the
-#  "License"); you may not use this file except in compliance
-#  with the License.  You may obtain a copy of the License at
-#
-#      http://www.apache.org/licenses/LICENSE-2.0
-#
-#  Unless required by applicable law or agreed to in writing, software
-#  distributed under the License is distributed on an "AS IS" BASIS,
-#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-#  See the License for the specific language governing permissions and
-# limitations under the License.
-################################################################################
-from enum import Enum
-
-from pyflink.java_gateway import get_gateway
-
-__all__ = ['TimeCharacteristic']
-
-
-class TimeCharacteristic(Enum):
-    """
-    The time characteristic defines how the system determines time for 
time-dependent
-    order and operations that depend on time (such as time windows).
-
-    :data:`ProcessingTime`:
-
-    Processing time for operators means that the operator uses the system 
clock of the machine
-    to determine the current time of the data stream. Processing-time windows 
trigger based
-    on wall-clock time and include whatever elements happen to have arrived at 
the operator at
-    that point in time.
-
-    Using processing time for window operations results in general in quite 
non-deterministic
-    results, because the contents of the windows depends on the speed in which 
elements arrive.
-    It is, however, the cheapest method of forming windows and the method that 
introduces the
-    least latency.
-
-    :data:`IngestionTime`:
-
-    Ingestion time means that the time of each individual element in the 
stream is determined
-    when the element enters the Flink streaming data flow. Operations like 
windows group the
-    elements based on that time, meaning that processing speed within the 
streaming dataflow
-    does not affect windowing, but only the speed at which sources receive 
elements.
-
-    Ingestion time is often a good compromise between processing time and 
event time.
-    It does not need any special manual form of watermark generation, and 
events are typically
-    not too much out-or-order when they arrive at operators; in fact, 
out-of-orderness can
-    only be introduced by streaming shuffles or split/join/union operations. 
The fact that
-    elements are not very much out-of-order means that the latency increase is 
moderate,
-    compared to event time.
-
-    :data:`EventTime`:
-
-    Event time means that the time of each individual element in the stream 
(also called event)
-    is determined by the event's individual custom timestamp. These timestamps 
either exist in
-    the elements from before they entered the Flink streaming dataflow, or are 
user-assigned at
-    the sources. The big implication of this is that it allows for elements to 
arrive in the
-    sources and in all operators out of order, meaning that elements with 
earlier timestamps may
-    arrive after elements with later timestamps.
-
-    Operators that window or order data with respect to event time must buffer 
data until they
-    can be sure that all timestamps for a certain time interval have been 
received. This is
-    handled by the so called "time watermarks".
-
-    Operations based on event time are very predictable - the result of 
windowing operations
-    is typically identical no matter when the window is executed and how fast 
the streams
-    operate. At the same time, the buffering and tracking of event time is 
also costlier than
-    operating with processing time, and typically also introduces more 
latency. The amount of
-    extra cost depends mostly on how much out of order the elements arrive, 
i.e., how long the
-    time span between the arrival of early and late elements is. With respect 
to the
-    "time watermarks", this means that the cost typically depends on how early 
or late the
-    watermarks can be generated for their timestamp.
-
-    In relation to :data:`IngestionTime`, the event time is similar, but 
refers the event's
-    original time, rather than the time assigned at the data source. 
Practically, that means
-    that event time has generally more meaning, but also that it takes longer 
to determine
-    that all elements for a certain time have arrived.
-    """
-
-    ProcessingTime = 0
-    IngestionTime = 1
-    EventTime = 2
-
-    @staticmethod
-    def _from_j_time_characteristic(j_time_characteristic) -> 
'TimeCharacteristic':
-        return TimeCharacteristic[j_time_characteristic.name()]
-
-    def _to_j_time_characteristic(self):
-        gateway = get_gateway()
-        JTimeCharacteristic = 
gateway.jvm.org.apache.flink.streaming.api.TimeCharacteristic
-        return getattr(JTimeCharacteristic, self.name)
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
deleted file mode 100644
index eaa04f34e75..00000000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/TimeCharacteristic.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.streaming.api;
-
-import org.apache.flink.annotation.PublicEvolving;
-import org.apache.flink.api.common.ExecutionConfig;
-import org.apache.flink.api.common.eventtime.WatermarkStrategy;
-
-/**
- * The time characteristic defines how the system determines time for 
time-dependent order and
- * operations that depend on time (such as time windows).
- *
- * @deprecated In Flink 1.12 the default stream time characteristic has been 
changed to {@link
- *     TimeCharacteristic#EventTime}, thus you don't need to call this method 
for enabling
- *     event-time support anymore. Explicitly using processing-time windows 
and timers works in
- *     event-time mode. If you need to disable watermarks, please use {@link
- *     ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using {@link
- *     TimeCharacteristic#IngestionTime}, please manually set an appropriate 
{@link
- *     WatermarkStrategy}. If you are using generic "time window" operations 
that change behaviour
- *     based on the time characteristic, please use equivalent operations that 
explicitly specify
- *     processing time or event time.
- */
-@PublicEvolving
-@Deprecated
-public enum TimeCharacteristic {
-
-    /**
-     * Processing time for operators means that the operator uses the system 
clock of the machine to
-     * determine the current time of the data stream. Processing-time windows 
trigger based on
-     * wall-clock time and include whatever elements happen to have arrived at 
the operator at that
-     * point in time.
-     *
-     * <p>Using processing time for window operations results in general in 
quite non-deterministic
-     * results, because the contents of the windows depends on the speed in 
which elements arrive.
-     * It is, however, the cheapest method of forming windows and the method 
that introduces the
-     * least latency.
-     */
-    ProcessingTime,
-
-    /**
-     * Ingestion time means that the time of each individual element in the 
stream is determined
-     * when the element enters the Flink streaming data flow. Operations like 
windows group the
-     * elements based on that time, meaning that processing speed within the 
streaming dataflow does
-     * not affect windowing, but only the speed at which sources receive 
elements.
-     *
-     * <p>Ingestion time is often a good compromise between processing time 
and event time. It does
-     * not need any special manual form of watermark generation, and events 
are typically not too
-     * much out-or-order when they arrive at operators; in fact, 
out-of-orderness can only be
-     * introduced by streaming shuffles or split/join/union operations. The 
fact that elements are
-     * not very much out-of-order means that the latency increase is moderate, 
compared to event
-     * time.
-     */
-    IngestionTime,
-
-    /**
-     * Event time means that the time of each individual element in the stream 
(also called event)
-     * is determined by the event's individual custom timestamp. These 
timestamps either exist in
-     * the elements from before they entered the Flink streaming dataflow, or 
are user-assigned at
-     * the sources. The big implication of this is that it allows for elements 
to arrive in the
-     * sources and in all operators out of order, meaning that elements with 
earlier timestamps may
-     * arrive after elements with later timestamps.
-     *
-     * <p>Operators that window or order data with respect to event time must 
buffer data until they
-     * can be sure that all timestamps for a certain time interval have been 
received. This is
-     * handled by the so called "time watermarks".
-     *
-     * <p>Operations based on event time are very predictable - the result of 
windowing operations
-     * is typically identical no matter when the window is executed and how 
fast the streams
-     * operate. At the same time, the buffering and tracking of event time is 
also costlier than
-     * operating with processing time, and typically also introduces more 
latency. The amount of
-     * extra cost depends mostly on how much out of order the elements arrive, 
i.e., how long the
-     * time span between the arrival of early and late elements is. With 
respect to the "time
-     * watermarks", this means that the cost typically depends on how early or 
late the watermarks
-     * can be generated for their timestamp.
-     *
-     * <p>In relation to {@link #IngestionTime}, the event time is similar, 
but refers the event's
-     * original time, rather than the time assigned at the data source. 
Practically, that means that
-     * event time has generally more meaning, but also that it takes longer to 
determine that all
-     * elements for a certain time have arrived.
-     */
-    EventTime
-}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
index 0bed680d10b..55d4bef793a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@@ -72,7 +72,6 @@ import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
@@ -96,7 +95,6 @@ import 
org.apache.flink.streaming.api.graph.StreamGraphGenerator;
 import org.apache.flink.streaming.api.operators.StreamSource;
 import org.apache.flink.streaming.api.operators.collect.CollectResultIterator;
 import org.apache.flink.streaming.api.transformations.CacheTransformation;
-import org.apache.flink.streaming.api.windowing.assigners.WindowAssigner;
 import org.apache.flink.util.AbstractID;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -150,10 +148,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         collectIterators.add(iterator);
     }
 
-    /** The time characteristic that is used if none other is set. */
-    private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC =
-            TimeCharacteristic.EventTime;
-
     /**
      * The environment of the context (local by default, cluster if invoked 
through command line).
      */
@@ -178,9 +172,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
 
     private final Map<AbstractID, CacheTransformation<?>> 
cachedTransformations = new HashMap<>();
 
-    /** The time characteristic used by the data streams. */
-    private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
-
     /**
      * Now we could not migrate this field to configuration. Because this 
object field remains
      * directly accessible and modifiable as it is exposed through a getter to 
users, allowing
@@ -703,50 +694,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
     //  Time characteristic
     // 
--------------------------------------------------------------------------------------------
 
-    /**
-     * Sets the time characteristic for all streams create from this 
environment, e.g., processing
-     * time, event time, or ingestion time.
-     *
-     * <p>If you set the characteristic to IngestionTime of EventTime this 
will set a default
-     * watermark update interval of 200 ms. If this is not applicable for your 
application you
-     * should change it using {@link 
ExecutionConfig#setAutoWatermarkInterval(long)}.
-     *
-     * @param characteristic The time characteristic.
-     * @deprecated In Flink 1.12 the default stream time characteristic has 
been changed to {@link
-     *     TimeCharacteristic#EventTime}, thus you don't need to call this 
method for enabling
-     *     event-time support anymore. Explicitly using processing-time 
windows and timers works in
-     *     event-time mode. If you need to disable watermarks, please use 
{@link
-     *     ExecutionConfig#setAutoWatermarkInterval(long)}. If you are using 
{@link
-     *     TimeCharacteristic#IngestionTime}, please manually set an 
appropriate {@link
-     *     WatermarkStrategy}. If you are using generic "time window" 
operations (for example
-     *     through {@link
-     *     
org.apache.flink.streaming.api.datastream.KeyedStream#window(WindowAssigner)} 
that change
-     *     behaviour based on the time characteristic, please use equivalent 
operations that
-     *     explicitly specify processing time or event time.
-     */
-    @PublicEvolving
-    @Deprecated
-    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) 
{
-        this.timeCharacteristic = Preconditions.checkNotNull(characteristic);
-        if (characteristic == TimeCharacteristic.ProcessingTime) {
-            getConfig().setAutoWatermarkInterval(0);
-        } else {
-            getConfig().setAutoWatermarkInterval(200);
-        }
-    }
-
-    /**
-     * Gets the time characteristic.
-     *
-     * @deprecated See {@link 
#setStreamTimeCharacteristic(TimeCharacteristic)} for deprecation
-     *     notice.
-     */
-    @PublicEvolving
-    @Deprecated
-    public TimeCharacteristic getStreamTimeCharacteristic() {
-        return timeCharacteristic;
-    }
-
     /**
      * Sets all relevant options contained in the {@link ReadableConfig}. It 
will reconfigure {@link
      * StreamExecutionEnvironment}, {@link ExecutionConfig} and {@link 
CheckpointConfig}.
@@ -2143,7 +2090,6 @@ public class StreamExecutionEnvironment implements 
AutoCloseable {
         // stream graph generation.
         return new StreamGraphGenerator(
                         new ArrayList<>(transformations), config, 
checkpointCfg, configuration)
-                .setTimeCharacteristic(getStreamTimeCharacteristic())
                 .setSlotSharingGroupResource(slotSharingGroupResources);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
index 6ce5ab9be85..e940c691231 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.java
@@ -93,8 +93,7 @@ public abstract class KeyedProcessFunction<K, I, O> extends 
AbstractRichFunction
         /**
          * Timestamp of the element currently being processed or timestamp of 
a firing timer.
          *
-         * <p>This might be {@code null}, for example if the time 
characteristic of your program is
-         * set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+         * <p>This might be {@code null}, depending on the stream's watermark 
strategy.
          */
         public abstract Long timestamp();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
index fc310ab98c0..8976e5a554f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/ProcessFunction.java
@@ -92,8 +92,7 @@ public abstract class ProcessFunction<I, O> extends 
AbstractRichFunction {
         /**
          * Timestamp of the element currently being processed or timestamp of 
a firing timer.
          *
-         * <p>This might be {@code null}, for example if the time 
characteristic of your program is
-         * set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+         * <p>This might be {@code null}, depending on the stream's watermark 
strategy.
          */
         public abstract Long timestamp();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
index e23b11e87ee..8e958c5f156 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/BaseBroadcastProcessFunction.java
@@ -44,8 +44,7 @@ public abstract class BaseBroadcastProcessFunction extends 
AbstractRichFunction
         /**
          * Timestamp of the element currently being processed or timestamp of 
a firing timer.
          *
-         * <p>This might be {@code null}, for example if the time 
characteristic of your program is
-         * set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+         * <p>This might be {@code null}, depending on the stream's watermark 
strategy.
          */
         public abstract Long timestamp();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
index d0969078220..38c29977fde 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/CoProcessFunction.java
@@ -109,8 +109,7 @@ public abstract class CoProcessFunction<IN1, IN2, OUT> 
extends AbstractRichFunct
         /**
          * Timestamp of the element currently being processed or timestamp of 
a firing timer.
          *
-         * <p>This might be {@code null}, for example if the time 
characteristic of your program is
-         * set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+         * <p>This might be {@code null}, depending on the stream's watermark 
strategy.
          */
         public abstract Long timestamp();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedCoProcessFunction.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedCoProcessFunction.java
index 5228d1ff88b..a3ed1a88c18 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedCoProcessFunction.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/co/KeyedCoProcessFunction.java
@@ -109,8 +109,7 @@ public abstract class KeyedCoProcessFunction<K, IN1, IN2, 
OUT> extends AbstractR
         /**
          * Timestamp of the element currently being processed or timestamp of 
a firing timer.
          *
-         * <p>This might be {@code null}, for example if the time 
characteristic of your program is
-         * set to {@link 
org.apache.flink.streaming.api.TimeCharacteristic#ProcessingTime}.
+         * <p>This might be {@code null}, depending on the stream's watermark 
strategy.
          */
         public abstract Long timestamp();
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
index 02baed7602d..02a23b40763 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperator.java
@@ -312,7 +312,6 @@ public class ContinuousFileReaderOperator<OUT, T extends 
TimestampedInputSplit>
 
         this.sourceContext =
                 StreamSourceContexts.getSourceContext(
-                        getOperatorConfig().getTimeCharacteristic(),
                         getProcessingTimeService(),
                         new Object(), // no actual locking needed
                         output,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
index 7414b822639..c8cf8fe2958 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamConfig.java
@@ -30,11 +30,9 @@ import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.core.memory.ManagedMemoryUseCase;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.util.CorruptConfigurationException;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
 import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
 import org.apache.flink.streaming.api.operators.SimpleOperatorFactory;
 import org.apache.flink.streaming.api.operators.StreamOperator;
@@ -129,9 +127,6 @@ public class StreamConfig implements Serializable {
 
     private static final String STATE_KEY_SERIALIZER = "statekeyser";
 
-    private static final ConfigOption<Integer> TIME_CHARACTERISTIC =
-            ConfigOptions.key("timechar").intType().defaultValue(-1);
-
     private static final String MANAGED_MEMORY_FRACTION_PREFIX = 
"managedMemFraction.";
 
     private static final String ATTRIBUTE = "attribute";
@@ -304,19 +299,6 @@ public class StreamConfig implements Serializable {
                 .collect(Collectors.toSet());
     }
 
-    public void setTimeCharacteristic(TimeCharacteristic characteristic) {
-        config.set(TIME_CHARACTERISTIC, characteristic.ordinal());
-    }
-
-    public TimeCharacteristic getTimeCharacteristic() {
-        int ordinal = config.get(TIME_CHARACTERISTIC, -1);
-        if (ordinal >= 0) {
-            return TimeCharacteristic.values()[ordinal];
-        } else {
-            throw new CorruptConfigurationException("time characteristic is 
not set");
-        }
-    }
-
     public void setTypeSerializerOut(TypeSerializer<?> serializer) {
         setTypeSerializer(TYPE_SERIALIZER_OUT_1, serializer);
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
index 0823ed29e32..95761820235 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraph.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.jobgraph.tasks.TaskInvokable;
 import org.apache.flink.runtime.state.CheckpointStorage;
 import org.apache.flink.runtime.state.StateBackend;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.lineage.LineageGraph;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
@@ -103,8 +102,6 @@ public class StreamGraph implements Pipeline {
     private final CheckpointConfig checkpointConfig;
     private SavepointRestoreSettings savepointRestoreSettings = 
SavepointRestoreSettings.none();
 
-    private TimeCharacteristic timeCharacteristic;
-
     private GlobalStreamExchangeMode globalExchangeMode;
 
     private boolean enableCheckpointsAfterTasksFinish;
@@ -242,14 +239,6 @@ public class StreamGraph implements Pipeline {
                 .orElse(new ArrayList<>());
     }
 
-    public TimeCharacteristic getTimeCharacteristic() {
-        return timeCharacteristic;
-    }
-
-    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
-        this.timeCharacteristic = timeCharacteristic;
-    }
-
     public GlobalStreamExchangeMode getGlobalStreamExchangeMode() {
         return globalExchangeMode;
     }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
index a7a20892409..d6396f66166 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java
@@ -40,7 +40,6 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.lineage.LineageGraph;
 import org.apache.flink.streaming.api.lineage.LineageGraphUtils;
@@ -136,9 +135,6 @@ public class StreamGraphGenerator {
     public static final int DEFAULT_LOWER_BOUND_MAX_PARALLELISM =
             KeyGroupRangeAssignment.DEFAULT_LOWER_BOUND_MAX_PARALLELISM;
 
-    public static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC =
-            TimeCharacteristic.ProcessingTime;
-
     public static final String DEFAULT_STREAMING_JOB_NAME = "Flink Streaming 
Job";
 
     public static final String DEFAULT_BATCH_JOB_NAME = "Flink Batch Job";
@@ -156,8 +152,6 @@ public class StreamGraphGenerator {
     // Records the slot sharing groups and their corresponding fine-grained 
ResourceProfile
     private final Map<String, ResourceProfile> slotSharingGroupResources = new 
HashMap<>();
 
-    private TimeCharacteristic timeCharacteristic = 
DEFAULT_TIME_CHARACTERISTIC;
-
     private SavepointRestoreSettings savepointRestoreSettings;
 
     private boolean shouldExecuteInBatchMode;
@@ -228,11 +222,6 @@ public class StreamGraphGenerator {
         this.savepointRestoreSettings = 
SavepointRestoreSettings.fromConfiguration(configuration);
     }
 
-    public StreamGraphGenerator setTimeCharacteristic(TimeCharacteristic 
timeCharacteristic) {
-        this.timeCharacteristic = timeCharacteristic;
-        return this;
-    }
-
     /**
      * Specify fine-grained resource requirements for slot sharing groups.
      *
@@ -307,7 +296,6 @@ public class StreamGraphGenerator {
     private void configureStreamGraph(final StreamGraph graph) {
         checkNotNull(graph);
 
-        graph.setTimeCharacteristic(timeCharacteristic);
         
graph.setVertexDescriptionMode(configuration.get(PipelineOptions.VERTEX_DESCRIPTION_MODE));
         graph.setVertexNameIncludeIndexPrefix(
                 
configuration.get(PipelineOptions.VERTEX_NAME_INCLUDE_INDEX_PREFIX));
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 570335a8789..1059ee14885 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -1129,8 +1129,6 @@ public class StreamingJobGraphGenerator {
 
         config.setStreamOperatorFactory(vertex.getOperatorFactory());
 
-        config.setTimeCharacteristic(streamGraph.getTimeCharacteristic());
-
         final CheckpointConfig checkpointCfg = 
streamGraph.getCheckpointConfig();
 
         config.setStateBackend(streamGraph.getStateBackend());
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
index d40f6e6f1c8..444a3cfaf5b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSource.java
@@ -21,7 +21,6 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.MetricOptions;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.OperatorChain;
@@ -75,8 +74,6 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
             final OperatorChain<?, ?> operatorChain)
             throws Exception {
 
-        final TimeCharacteristic timeCharacteristic = 
getOperatorConfig().getTimeCharacteristic();
-
         final Configuration configuration =
                 
this.getContainingTask().getEnvironment().getTaskManagerInfo().getConfiguration();
         final long latencyTrackingInterval =
@@ -99,7 +96,6 @@ public class StreamSource<OUT, SRC extends 
SourceFunction<OUT>>
 
         this.ctx =
                 StreamSourceContexts.getSourceContext(
-                        timeCharacteristic,
                         getProcessingTimeService(),
                         lockingObject,
                         collector,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
index f4d3416cec0..ba44c53b566 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/operators/StreamSourceContexts.java
@@ -18,7 +18,6 @@
 package org.apache.flink.streaming.api.operators;
 
 import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
@@ -38,18 +37,7 @@ import java.util.concurrent.ScheduledFuture;
 @Deprecated
 public class StreamSourceContexts {
 
-    /**
-     * Depending on the {@link TimeCharacteristic}, this method will return 
the adequate {@link
-     * SourceFunction.SourceContext}. That is:
-     *
-     * <ul>
-     *   <li>{@link TimeCharacteristic#IngestionTime} = {@code 
AutomaticWatermarkContext}
-     *   <li>{@link TimeCharacteristic#ProcessingTime} = {@code 
NonTimestampContext}
-     *   <li>{@link TimeCharacteristic#EventTime} = {@code 
ManualWatermarkContext}
-     * </ul>
-     */
     public static <OUT> SourceFunction.SourceContext<OUT> getSourceContext(
-            TimeCharacteristic timeCharacteristic,
             ProcessingTimeService processingTimeService,
             Object checkpointLock,
             Output<StreamRecord<OUT>> output,
@@ -57,37 +45,14 @@ public class StreamSourceContexts {
             long idleTimeout,
             boolean emitProgressiveWatermarks) {
 
-        final SourceFunction.SourceContext<OUT> ctx;
-        switch (timeCharacteristic) {
-            case EventTime:
-                ctx =
-                        new ManualWatermarkContext<>(
-                                output,
-                                processingTimeService,
-                                checkpointLock,
-                                idleTimeout,
-                                emitProgressiveWatermarks);
-
-                break;
-            case IngestionTime:
-                Preconditions.checkState(
-                        emitProgressiveWatermarks,
-                        "Ingestion time is not available when emitting 
progressive watermarks "
-                                + "is disabled.");
-                ctx =
-                        new AutomaticWatermarkContext<>(
-                                output,
-                                watermarkInterval,
-                                processingTimeService,
-                                checkpointLock,
-                                idleTimeout);
-                break;
-            case ProcessingTime:
-                ctx = new NonTimestampContext<>(checkpointLock, output);
-                break;
-            default:
-                throw new 
IllegalArgumentException(String.valueOf(timeCharacteristic));
-        }
+        final SourceFunction.SourceContext<OUT> ctx =
+                new ManualWatermarkContext<>(
+                        output,
+                        processingTimeService,
+                        checkpointLock,
+                        idleTimeout,
+                        emitProgressiveWatermarks);
+
         return new SwitchingOnClose<>(ctx);
     }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index e8591ad5cc8..b99061bfe90 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -79,7 +79,6 @@ import 
org.apache.flink.runtime.taskmanager.AsyncExceptionHandler;
 import org.apache.flink.runtime.taskmanager.AsynchronousException;
 import org.apache.flink.runtime.taskmanager.DispatcherThreadFactory;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.InternalTimeServiceManager;
@@ -1206,8 +1205,8 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
     }
 
     boolean isSerializingTimestamps() {
-        TimeCharacteristic tc = configuration.getTimeCharacteristic();
-        return tc == TimeCharacteristic.EventTime | tc == 
TimeCharacteristic.IngestionTime;
+        // Only used in StreamIterationHead and will soon be removed
+        return true;
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorTest.java
index f8779fe9df9..0e65b1ea50d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/functions/source/ContinuousFileReaderOperatorTest.java
@@ -24,7 +24,6 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.mailbox.Mail;
 import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
@@ -41,7 +40,6 @@ class ContinuousFileReaderOperatorTest {
         OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> 
harness =
                 createHarness(failingFormat());
         harness.getExecutionConfig().setAutoWatermarkInterval(10);
-        harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
         assertThatThrownBy(
                         () -> {
@@ -59,7 +57,6 @@ class ContinuousFileReaderOperatorTest {
         OneInputStreamOperatorTestHarness<TimestampedFileInputSplit, String> 
harness =
                 createHarness(failingFormat());
         harness.getExecutionConfig().setAutoWatermarkInterval(10);
-        harness.setTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
         assertThatThrownBy(
                         () -> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
index 367cd0ddfa9..6210e4860ee 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/api/operators/StreamSourceContextIdleDetectionTests.java
@@ -18,7 +18,6 @@
 
 package org.apache.flink.streaming.api.operators;
 
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
@@ -91,7 +90,6 @@ class StreamSourceContextIdleDetectionTests {
 
         SourceFunction.SourceContext<String> context =
                 StreamSourceContexts.getSourceContext(
-                        TimeCharacteristic.EventTime,
                         processingTimeService,
                         new Object(),
                         new CollectorOutput<>(output),
@@ -163,177 +161,6 @@ class StreamSourceContextIdleDetectionTests {
         }
     }
 
-    /**
-     * Test scenario (idleTimeout = 100, watermarkInterval = 40): (1) Start 
from 20 as initial time.
-     * (2) As soon as time reaches 120, status should have been toggled to 
IDLE. (3) After some
-     * arbitrary time (until 320), the status should remain IDLE, and no 
watermarks should have been
-     * emitted. (4) Emit a record at 330. Status should become ACTIVE. This 
should schedule a
-     * idleness detection to be fired at 430. (5) Emit another record at 350 
(which is before the
-     * next check). This should make the idleness check pass. (6) Advance time 
to 430 and trigger
-     * idleness detection. The status should still be ACTIVE due to step (5). 
This should schedule a
-     * idleness detection to be fired at 530. (7) Advance time to 460, in 
which a watermark emission
-     * task should be fired. Idleness detection should have been 
"piggy-backed" in the task,
-     * allowing the status to be toggled to IDLE before the next actual idle 
detection task at 530.
-     *
-     * <p>Inline comments will refer to the corresponding tested steps in the 
scenario.
-     */
-    @TestTemplate
-    void testAutomaticWatermarkContext() throws Exception {
-        long watermarkInterval = 40;
-        long idleTimeout = 100;
-        long initialTime = 20;
-
-        TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
-        processingTimeService.setCurrentTime(initialTime);
-
-        final List<StreamElement> output = new ArrayList<>();
-        final List<StreamElement> expectedOutput = new ArrayList<>();
-
-        SourceFunction.SourceContext<String> context =
-                StreamSourceContexts.getSourceContext(
-                        TimeCharacteristic.IngestionTime,
-                        processingTimeService,
-                        new Object(),
-                        new CollectorOutput<String>(output),
-                        watermarkInterval,
-                        idleTimeout,
-                        true);
-
-        // -------------------------- begin test scenario 
--------------------------
-
-        // corresponds to step (2) of scenario (please see method-level 
Javadoc comment)
-        processingTimeService.setCurrentTime(initialTime + watermarkInterval);
-        expectedOutput.add(
-                new Watermark(
-                        processingTimeService.getCurrentProcessingTime()
-                                - 
(processingTimeService.getCurrentProcessingTime()
-                                        % watermarkInterval)));
-        processingTimeService.setCurrentTime(initialTime + 2 * 
watermarkInterval);
-        expectedOutput.add(
-                new Watermark(
-                        processingTimeService.getCurrentProcessingTime()
-                                - 
(processingTimeService.getCurrentProcessingTime()
-                                        % watermarkInterval)));
-        processingTimeService.setCurrentTime(initialTime + idleTimeout);
-        expectedOutput.add(WatermarkStatus.IDLE);
-        assertThat(output).isEqualTo(expectedOutput);
-
-        // corresponds to step (3) of scenario (please see method-level 
Javadoc comment)
-        processingTimeService.setCurrentTime(initialTime + 3 * 
watermarkInterval);
-        processingTimeService.setCurrentTime(initialTime + 4 * 
watermarkInterval);
-        processingTimeService.setCurrentTime(initialTime + 2 * idleTimeout);
-        processingTimeService.setCurrentTime(initialTime + 6 * 
watermarkInterval);
-        processingTimeService.setCurrentTime(initialTime + 7 * 
watermarkInterval);
-        processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout);
-        assertThat(output).isEqualTo(expectedOutput);
-
-        // corresponds to step (4) of scenario (please see method-level 
Javadoc comment)
-        processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + 
idleTimeout / 10);
-        switch (testMethod) {
-            case COLLECT:
-                expectedOutput.add(WatermarkStatus.ACTIVE);
-                context.collect("msg");
-                expectedOutput.add(
-                        new StreamRecord<>(
-                                "msg", 
processingTimeService.getCurrentProcessingTime()));
-                expectedOutput.add(
-                        new Watermark(
-                                
processingTimeService.getCurrentProcessingTime()
-                                        - 
(processingTimeService.getCurrentProcessingTime()
-                                                % watermarkInterval)));
-                assertThat(output).isEqualTo(expectedOutput);
-                break;
-            case COLLECT_WITH_TIMESTAMP:
-                expectedOutput.add(WatermarkStatus.ACTIVE);
-                context.collectWithTimestamp(
-                        "msg", 
processingTimeService.getCurrentProcessingTime());
-                expectedOutput.add(
-                        new StreamRecord<>(
-                                "msg", 
processingTimeService.getCurrentProcessingTime()));
-                expectedOutput.add(
-                        new Watermark(
-                                
processingTimeService.getCurrentProcessingTime()
-                                        - 
(processingTimeService.getCurrentProcessingTime()
-                                                % watermarkInterval)));
-                assertThat(output).isEqualTo(expectedOutput);
-                break;
-            case EMIT_WATERMARK:
-                // for emitWatermark, since the watermark will be blocked,
-                // it should not make the status become active;
-                // from here on, the status should remain idle for the 
emitWatermark variant test
-                context.emitWatermark(
-                        new 
Watermark(processingTimeService.getCurrentProcessingTime()));
-                assertThat(output).isEqualTo(expectedOutput);
-        }
-
-        // corresponds to step (5) of scenario (please see method-level 
Javadoc comment)
-        processingTimeService.setCurrentTime(initialTime + 8 * 
watermarkInterval);
-        processingTimeService.setCurrentTime(initialTime + 3 * idleTimeout + 3 
* idleTimeout / 10);
-        switch (testMethod) {
-            case COLLECT:
-                context.collect("msg");
-                expectedOutput.add(
-                        new StreamRecord<>(
-                                "msg", 
processingTimeService.getCurrentProcessingTime()));
-                assertThat(output).isEqualTo(expectedOutput);
-                break;
-            case COLLECT_WITH_TIMESTAMP:
-                context.collectWithTimestamp(
-                        "msg", 
processingTimeService.getCurrentProcessingTime());
-                expectedOutput.add(
-                        new StreamRecord<>(
-                                "msg", 
processingTimeService.getCurrentProcessingTime()));
-                assertThat(output).isEqualTo(expectedOutput);
-                break;
-            case EMIT_WATERMARK:
-                context.emitWatermark(
-                        new 
Watermark(processingTimeService.getCurrentProcessingTime()));
-                assertThat(output).isEqualTo(expectedOutput);
-        }
-
-        processingTimeService.setCurrentTime(initialTime + 9 * 
watermarkInterval);
-        switch (testMethod) {
-            case COLLECT:
-            case COLLECT_WITH_TIMESTAMP:
-                expectedOutput.add(
-                        new Watermark(
-                                
processingTimeService.getCurrentProcessingTime()
-                                        - 
(processingTimeService.getCurrentProcessingTime()
-                                                % watermarkInterval)));
-                assertThat(output).isEqualTo(expectedOutput);
-                break;
-            case EMIT_WATERMARK:
-                assertThat(output).isEqualTo(expectedOutput);
-        }
-
-        processingTimeService.setCurrentTime(initialTime + 10 * 
watermarkInterval);
-        switch (testMethod) {
-            case COLLECT:
-            case COLLECT_WITH_TIMESTAMP:
-                expectedOutput.add(
-                        new Watermark(
-                                
processingTimeService.getCurrentProcessingTime()
-                                        - 
(processingTimeService.getCurrentProcessingTime()
-                                                % watermarkInterval)));
-                assertThat(output).isEqualTo(expectedOutput);
-                break;
-            case EMIT_WATERMARK:
-                assertThat(output).isEqualTo(expectedOutput);
-        }
-
-        // corresponds to step (6) of scenario (please see method-level 
Javadoc comment)
-        processingTimeService.setCurrentTime(initialTime + 4 * idleTimeout + 
idleTimeout / 10);
-        assertThat(output).isEqualTo(expectedOutput);
-
-        // corresponds to step (7) of scenario (please see method-level 
Javadoc comment)
-        processingTimeService.setCurrentTime(initialTime + 11 * 
watermarkInterval);
-        // emit watermark does not change the previous status
-        if (testMethod != TestMethod.EMIT_WATERMARK) {
-            expectedOutput.add(WatermarkStatus.IDLE);
-        }
-        assertThat(output).isEqualTo(expectedOutput);
-    }
-
     @Parameters(name = "TestMethod = {0}")
     @SuppressWarnings("unchecked")
     private static Collection<TestMethod[]> timeCharacteristic() {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
index cce23535034..c4ee20a42cb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/ContinuousFileProcessingRescalingTest.java
@@ -25,7 +25,6 @@ import org.apache.flink.api.java.typeutils.TypeExtractor;
 import org.apache.flink.core.fs.FileInputSplit;
 import org.apache.flink.core.fs.Path;
 import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.ContinuousFileReaderOperatorFactory;
 import 
org.apache.flink.streaming.api.functions.source.TimestampedFileInputSplit;
 import org.apache.flink.streaming.api.watermark.Watermark;
@@ -180,7 +179,6 @@ class ContinuousFileProcessingRescalingTest {
                         maxParallelism,
                         noOfTasks,
                         taskIdx);
-        testHarness.setTimeCharacteristic(TimeCharacteristic.EventTime);
         return testHarness;
     }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
index 529e7f1bc97..8fd1d2b4682 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorLatencyMetricsTest.java
@@ -27,7 +27,6 @@ import 
org.apache.flink.runtime.operators.testutils.MockEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
 import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.Output;
@@ -237,7 +236,6 @@ class StreamSourceOperatorLatencyMetricsTest {
         StreamConfig cfg = new StreamConfig(new Configuration());
         cfg.setStateBackend(new HashMapStateBackend());
 
-        cfg.setTimeCharacteristic(TimeCharacteristic.EventTime);
         cfg.setOperatorID(new OperatorID());
         cfg.serializeAllConfigs();
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
index 2b0cbea8fa9..19a0e7d8404 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/operators/windowing/TimeWindowTranslationTest.java
@@ -22,7 +22,6 @@ import org.apache.flink.api.common.functions.ReduceFunction;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.state.ReducingStateDescriptor;
 import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.WindowedStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -55,7 +54,6 @@ class TimeWindowTranslationTest {
     @Test
     void testAlignedWindowDeprecation() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         DataStream<Tuple2<String, Integer>> source =
                 env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -105,7 +103,6 @@ class TimeWindowTranslationTest {
     @SuppressWarnings("rawtypes")
     void testReduceEventTimeWindows() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
         DataStream<Tuple2<String, Integer>> source =
                 env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
@@ -133,7 +130,6 @@ class TimeWindowTranslationTest {
     @SuppressWarnings("rawtypes")
     void testApplyEventTimeWindows() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
 
         DataStream<Tuple2<String, Integer>> source =
                 env.fromData(Tuple2.of("hello", 1), Tuple2.of("hello", 2));
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
index a831ffa64ea..8e8edd9ce3e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/runtime/tasks/InterruptSensitiveRestoreTest.java
@@ -74,7 +74,6 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -139,7 +138,6 @@ class InterruptSensitiveRestoreTest {
         IN_RESTORE_LATCH.reset();
         Configuration taskConfig = new Configuration();
         StreamConfig cfg = new StreamConfig(taskConfig);
-        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
         switch (mode) {
             case OPERATOR_MANAGED:
             case OPERATOR_RAW:
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
index 4f6791e3268..2d93e3375f3 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/streaming/util/AbstractStreamOperatorTestHarness.java
@@ -56,7 +56,6 @@ import 
org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
 import org.apache.flink.runtime.state.ttl.MockTtlTimeProvider;
 import org.apache.flink.runtime.state.ttl.TtlTimeProvider;
 import org.apache.flink.runtime.taskmanager.NoOpTaskOperatorEventGateway;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperatorTest;
@@ -793,14 +792,6 @@ public class AbstractStreamOperatorTestHarness<OUT> 
implements AutoCloseable {
         return processingTimeService.getCurrentProcessingTime();
     }
 
-    public void setTimeCharacteristic(TimeCharacteristic timeCharacteristic) {
-        this.config.setTimeCharacteristic(timeCharacteristic);
-    }
-
-    public TimeCharacteristic getTimeCharacteristic() {
-        return this.config.getTimeCharacteristic();
-    }
-
     public boolean wasFailedExternally() {
         return wasFailedExternally;
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
index 89a1104a3b6..86d7870b24e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/operators/AbstractUdfStreamOperatorLifecycleTest.java
@@ -32,7 +32,6 @@ import org.apache.flink.runtime.shuffle.ShuffleEnvironment;
 import org.apache.flink.runtime.state.StateInitializationContext;
 import org.apache.flink.runtime.state.StateSnapshotContext;
 import org.apache.flink.runtime.taskmanager.Task;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
@@ -161,7 +160,6 @@ class AbstractUdfStreamOperatorLifecycleTest {
 
         cfg.setStreamOperator(new LifecycleTrackingStreamSource<>(srcFun, 
true));
         cfg.setOperatorID(new OperatorID());
-        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         try (ShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder().build()) {
             Task task =
@@ -192,7 +190,6 @@ class AbstractUdfStreamOperatorLifecycleTest {
         MockSourceFunction srcFun = new MockSourceFunction();
         cfg.setStreamOperator(new LifecycleTrackingStreamSource<>(srcFun, 
false));
         cfg.setOperatorID(new OperatorID());
-        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         try (ShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder().build()) {
             Task task =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
index eafe91b5940..93d526a4d04 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/operators/StreamSourceOperatorWatermarksTest.java
@@ -18,44 +18,23 @@
 
 package org.apache.flink.streaming.runtime.operators;
 
-import org.apache.flink.api.common.ExecutionConfig;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeinfo.TypeInformation;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.jobgraph.OperatorID;
-import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
-import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
-import org.apache.flink.runtime.state.storage.JobManagerCheckpointStorage;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichSourceFunction;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
-import org.apache.flink.streaming.api.operators.Output;
 import org.apache.flink.streaming.api.operators.StreamSource;
-import org.apache.flink.streaming.api.operators.StreamSourceContexts;
 import org.apache.flink.streaming.api.watermark.Watermark;
-import org.apache.flink.streaming.runtime.streamrecord.StreamElement;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.SourceStreamTask;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTestHarness;
-import org.apache.flink.streaming.runtime.tasks.TestProcessingTimeService;
-import org.apache.flink.streaming.runtime.tasks.TimerService;
-import org.apache.flink.streaming.util.CollectorOutput;
-import org.apache.flink.streaming.util.MockStreamTask;
-import org.apache.flink.streaming.util.MockStreamTaskBuilder;
 import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.jupiter.api.Test;
 
-import java.util.ArrayList;
-import java.util.List;
-
-import static 
org.apache.flink.streaming.api.operators.StreamOperatorUtils.setupStreamOperator;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
-import static org.mockito.Mockito.mock;
 
 /** Tests for {@link StreamSource} operators. */
 @SuppressWarnings("serial")
@@ -126,86 +105,8 @@ class StreamSourceOperatorWatermarksTest {
         assertThat(testHarness.getOutput()).isEmpty();
     }
 
-    @Test
-    void testAutomaticWatermarkContext() throws Exception {
-
-        // regular stream source operator
-        final StreamSource<String, InfiniteSource<String>> operator =
-                new StreamSource<>(new InfiniteSource<>());
-
-        long watermarkInterval = 10;
-        TestProcessingTimeService processingTimeService = new 
TestProcessingTimeService();
-        processingTimeService.setCurrentTime(0);
-
-        MockStreamTask<?, ?> task =
-                setupSourceOperator(
-                        operator,
-                        TimeCharacteristic.IngestionTime,
-                        watermarkInterval,
-                        processingTimeService);
-
-        final List<StreamElement> output = new ArrayList<>();
-
-        StreamSourceContexts.getSourceContext(
-                TimeCharacteristic.IngestionTime,
-                processingTimeService,
-                task.getCheckpointLock(),
-                new CollectorOutput<String>(output),
-                operator.getExecutionConfig().getAutoWatermarkInterval(),
-                -1,
-                true);
-
-        // periodically emit the watermarks
-        // even though we start from 1 the watermark are still
-        // going to be aligned with the watermark interval.
-
-        for (long i = 1; i < 100; i += watermarkInterval) {
-            processingTimeService.setCurrentTime(i);
-        }
-
-        assertThat(output).hasSize(9);
-
-        long nextWatermark = 0;
-        for (StreamElement el : output) {
-            nextWatermark += watermarkInterval;
-            Watermark wm = (Watermark) el;
-            assertThat(wm.getTimestamp()).isEqualTo(nextWatermark);
-        }
-    }
-
     // ------------------------------------------------------------------------
 
-    @SuppressWarnings("unchecked")
-    private static <T> MockStreamTask setupSourceOperator(
-            StreamSource<T, ?> operator,
-            TimeCharacteristic timeChar,
-            long watermarkInterval,
-            final TimerService timeProvider)
-            throws Exception {
-
-        ExecutionConfig executionConfig = new ExecutionConfig();
-        executionConfig.setAutoWatermarkInterval(watermarkInterval);
-
-        StreamConfig cfg = new StreamConfig(new Configuration());
-        cfg.setStateBackend(new HashMapStateBackend());
-        cfg.setCheckpointStorage(new JobManagerCheckpointStorage());
-
-        cfg.setTimeCharacteristic(timeChar);
-        cfg.setOperatorID(new OperatorID());
-
-        Environment env = new DummyEnvironment("MockTwoInputTask", 1, 0);
-
-        MockStreamTask mockTask =
-                new MockStreamTaskBuilder(env)
-                        .setConfig(cfg)
-                        .setExecutionConfig(executionConfig)
-                        .setTimerService(timeProvider)
-                        .build();
-
-        setupStreamOperator(operator, mockTask, cfg, (Output<StreamRecord<T>>) 
mock(Output.class));
-        return mockTask;
-    }
-
     private static <T> StreamTaskTestHarness<T> setupSourceStreamTask(
             StreamSource<T, ?> sourceOperator, TypeInformation<T> outputType) {
 
@@ -236,7 +137,6 @@ class StreamSourceOperatorWatermarksTest {
         StreamConfig streamConfig = testHarness.getStreamConfig();
         streamConfig.setStreamOperator(sourceOperator);
         streamConfig.setOperatorID(new OperatorID());
-        streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
 
         return testHarness;
     }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
index 1491f4142fe..822947e71e4 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SourceStreamTaskTest.java
@@ -50,7 +50,6 @@ import 
org.apache.flink.runtime.operators.testutils.ExpectedTestException;
 import org.apache.flink.runtime.state.CheckpointStorageLocationReference;
 import org.apache.flink.runtime.taskmanager.CheckpointResponder;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.ListCheckpointed;
 import 
org.apache.flink.streaming.api.functions.source.legacy.FromElementsFunction;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
@@ -236,7 +235,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase 
{
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
-        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         testHarness.invoke();
         testHarness.waitForTaskCompletion();
@@ -275,7 +273,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase 
{
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
-        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         ConcurrentLinkedQueue<Object> expectedOutput = new 
ConcurrentLinkedQueue<>();
 
@@ -340,7 +337,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase 
{
                 .finish();
 
         StreamConfig streamConfig = testHarness.getStreamConfig();
-        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         testHarness.invoke();
         CancelLockingSource.awaitRunning();
@@ -452,9 +448,6 @@ class SourceStreamTaskTest extends SourceStreamTaskTestBase 
{
                         STRING_TYPE_INFO.createSerializer(new 
SerializerConfigImpl()))
                 .finish();
 
-        StreamConfig streamConfig = testHarness.getStreamConfig();
-        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
         testHarness.invoke();
         try {
             testHarness.waitForTaskCompletion();
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
index e16b2f73f49..f5098c5b971 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskMailboxTestHarnessBuilder.java
@@ -45,7 +45,6 @@ import 
org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.taskmanager.TestCheckpointResponder;
 import org.apache.flink.runtime.throughput.BufferDebloatConfiguration;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamConfig.InputConfig;
@@ -117,7 +116,6 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
             TypeInformation<OUT> outputType) {
         this.taskFactory = checkNotNull(taskFactory);
         outputSerializer = 
outputType.createSerializer(executionConfig.getSerializerConfig());
-        streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
     }
 
     public <T> StreamTaskMailboxTestHarnessBuilder<OUT> modifyExecutionConfig(
@@ -381,7 +379,6 @@ public class StreamTaskMailboxTestHarnessBuilder<OUT> {
                         ResultPartitionType.PIPELINED_BOUNDED));
 
         StreamConfig sourceConfig = new StreamConfig(new Configuration());
-        
sourceConfig.setTimeCharacteristic(streamConfig.getTimeCharacteristic());
         sourceConfig.setVertexNonChainedOutputs(streamOutputsInOrder);
         sourceConfig.setChainedOutputs(chainedOutputs);
         sourceConfig.setTypeSerializerOut(sourceInput.getSourceSerializer());
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java
index 94430105a72..5f254a9ae57 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskSystemExitTest.java
@@ -58,7 +58,6 @@ import org.apache.flink.runtime.taskmanager.Task;
 import org.apache.flink.runtime.taskmanager.TaskManagerActions;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
@@ -159,7 +158,6 @@ class StreamTaskSystemExitTest {
         final StreamConfig streamConfig = new StreamConfig(taskConfiguration);
         streamConfig.setOperatorID(new OperatorID());
         streamConfig.setStreamOperator(operator);
-        streamConfig.setTimeCharacteristic(TimeCharacteristic.ProcessingTime); 
// for source run
         streamConfig.serializeAllConfigs();
 
         final JobInformation jobInformation =
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
index 33a339c5d51..9593fa85418 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTest.java
@@ -114,7 +114,6 @@ import org.apache.flink.runtime.taskmanager.TestTaskBuilder;
 import org.apache.flink.runtime.testutils.ExceptionallyDoneFuture;
 import org.apache.flink.runtime.throughput.ThroughputCalculator;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
 import 
org.apache.flink.streaming.api.functions.source.legacy.RichParallelSourceFunction;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
@@ -478,7 +477,6 @@ public class StreamTaskTest {
         final StreamConfig cfg = new StreamConfig(new Configuration());
         cfg.setOperatorID(new OperatorID(4711L, 42L));
         cfg.setStreamOperator(new SlowlyDeserializingOperator());
-        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
         cfg.serializeAllConfigs();
 
         final TaskManagerActions taskManagerActions = spy(new 
NoOpTaskManagerActions());
@@ -520,7 +518,6 @@ public class StreamTaskTest {
         TestStreamSource<Long, MockSourceFunction> streamSource =
                 new TestStreamSource<>(new MockSourceFunction());
         cfg.setStreamOperator(streamSource);
-        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         try (ShuffleEnvironment shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder().build()) {
             Task task =
@@ -561,7 +558,6 @@ public class StreamTaskTest {
         TestStreamSource<Long, MockSourceFunction> streamSource =
                 new TestStreamSource<>(new MockSourceFunction());
         cfg.setStreamOperator(streamSource);
-        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         try (NettyShuffleEnvironment shuffleEnvironment =
                 new NettyShuffleEnvironmentBuilder().build()) {
@@ -1532,7 +1528,6 @@ public class StreamTaskTest {
 
         FailedSource failedSource = new FailedSource();
         cfg.setStreamOperator(new TestStreamSource<String, 
FailedSource>(failedSource));
-        cfg.setTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         try (NettyShuffleEnvironment shuffleEnvironment =
                 new NettyShuffleEnvironmentBuilder().build()) {
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
index cc5597cca10..d4e072cfe8e 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskTestHarness.java
@@ -51,7 +51,6 @@ import org.apache.flink.runtime.state.TestLocalRecoveryConfig;
 import org.apache.flink.runtime.state.TestTaskStateManager;
 import org.apache.flink.runtime.taskmanager.TaskManagerRuntimeInfo;
 import org.apache.flink.runtime.util.TestingTaskManagerRuntimeInfo;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.graph.NonChainedOutput;
 import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.graph.StreamNode;
@@ -227,7 +226,6 @@ public class StreamTaskTestHarness<OUT> {
         Preconditions.checkState(!setupCalled, "This harness was already 
setup.");
         setupCalled = true;
         streamConfig.setChainStart();
-        streamConfig.setTimeCharacteristic(TimeCharacteristic.EventTime);
         streamConfig.setNumberOfOutputs(1);
         streamConfig.setTypeSerializerOut(outputSerializer);
         streamConfig.setVertexID(0);
diff --git 
a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
index c69c25983ff..debf4c4cbaf 100644
--- 
a/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-bridge-base/src/main/java/org/apache/flink/table/api/bridge/internal/AbstractStreamTableEnvironmentImpl.java
@@ -24,7 +24,6 @@ import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.api.java.typeutils.TupleTypeInfo;
 import org.apache.flink.api.java.typeutils.TypeExtractor;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.Schema;
@@ -298,13 +297,11 @@ public abstract class AbstractStreamTableEnvironmentImpl 
extends TableEnvironmen
     }
 
     protected void validateTimeCharacteristic(boolean isRowtimeDefined) {
-        if (isRowtimeDefined
-                && executionEnvironment.getStreamTimeCharacteristic()
-                        != TimeCharacteristic.EventTime) {
+        if (isRowtimeDefined && 
executionEnvironment.getConfig().getAutoWatermarkInterval() <= 0) {
             throw new ValidationException(
                     String.format(
-                            "A rowtime attribute requires an EventTime time 
characteristic in stream environment. But is: %s",
-                            
executionEnvironment.getStreamTimeCharacteristic()));
+                            "A rowtime attribute requires a positive watermark 
interval in stream environment. But is: %s",
+                            
executionEnvironment.getConfig().getAutoWatermarkInterval()));
         }
     }
 
diff --git 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
index cb65d4ea7fb..89aa1238d22 100644
--- 
a/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java-bridge/src/main/java/org/apache/flink/table/api/bridge/java/internal/StreamTableEnvironmentImpl.java
@@ -52,12 +52,10 @@ import org.apache.flink.table.functions.AggregateFunction;
 import org.apache.flink.table.functions.TableAggregateFunction;
 import org.apache.flink.table.functions.TableFunction;
 import org.apache.flink.table.functions.UserDefinedFunctionHelper;
-import org.apache.flink.table.legacy.sources.TableSource;
 import org.apache.flink.table.module.ModuleManager;
 import org.apache.flink.table.operations.ExternalQueryOperation;
 import org.apache.flink.table.operations.OutputConversionModifyOperation;
 import org.apache.flink.table.resource.ResourceManager;
-import org.apache.flink.table.sources.TableSourceValidation;
 import org.apache.flink.table.types.AbstractDataType;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.utils.TypeConversions;
@@ -392,10 +390,4 @@ public final class StreamTableEnvironmentImpl extends 
AbstractStreamTableEnviron
                         OutputConversionModifyOperation.UpdateMode.RETRACT);
         return toStreamInternal(table, modifyOperation);
     }
-
-    @Override
-    protected void validateTableSource(TableSource<?> tableSource) {
-        super.validateTableSource(tableSource);
-        
validateTimeCharacteristic(TableSourceValidation.hasRowtimeAttribute(tableSource));
-    }
 }
diff --git 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
index 3a90a0f650d..5c3e2c556c4 100644
--- 
a/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
+++ 
b/flink-table/flink-table-api-scala-bridge/src/main/scala/org/apache/flink/table/api/bridge/scala/internal/StreamTableEnvironmentImpl.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.api.bridge.scala.internal
 
 import org.apache.flink.annotation.Internal
 import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import org.apache.flink.table.api._
@@ -265,22 +264,6 @@ class StreamTableEnvironmentImpl(
     )
   }
 
-  override protected def validateTableSource(tableSource: TableSource[_]): 
Unit = {
-    super.validateTableSource(tableSource)
-    // check that event-time is enabled if table source includes rowtime 
attributes
-    if (
-      TableSourceValidation.hasRowtimeAttribute(tableSource) &&
-      executionEnvironment.getStreamTimeCharacteristic != 
TimeCharacteristic.EventTime
-    ) {
-      throw new TableException(
-        String.format(
-          "A rowtime attribute requires an EventTime time characteristic in 
stream " +
-            "environment. But is: %s}",
-          executionEnvironment.getStreamTimeCharacteristic
-        ))
-    }
-  }
-
   override def createTemporaryView[T](
       path: String,
       dataStream: DataStream[T],
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
index 422323fcf8a..a6b3a324281 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/DummyStreamExecutionEnvironment.java
@@ -27,7 +27,6 @@ import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.core.execution.CheckpointingMode;
 import org.apache.flink.legacy.table.sinks.StreamTableSink;
 import org.apache.flink.legacy.table.sources.StreamTableSource;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.graph.StreamGraph;
@@ -174,17 +173,6 @@ public class DummyStreamExecutionEnvironment extends 
StreamExecutionEnvironment
         return realExecEnv.getCheckpointingConsistencyMode();
     }
 
-    @Override
-    public void setStreamTimeCharacteristic(TimeCharacteristic characteristic) 
{
-        throw new UnsupportedOperationException(
-                "This is a dummy StreamExecutionEnvironment, 
setStreamTimeCharacteristic method is unsupported.");
-    }
-
-    @Override
-    public TimeCharacteristic getStreamTimeCharacteristic() {
-        return realExecEnv.getStreamTimeCharacteristic();
-    }
-
     @Override
     public JobExecutionResult execute() throws Exception {
         throw new UnsupportedOperationException(
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
index 514e2f247b2..339f8a4aa15 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/AggregateITCase.scala
@@ -19,7 +19,6 @@ package org.apache.flink.table.planner.runtime.stream.sql
 
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.api.java.typeutils.RowTypeInfo
-import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.datastream.DataStream
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
@@ -181,10 +180,6 @@ class AggregateITCase(aggMode: AggMode, miniBatch: 
MiniBatchMode, backend: State
 
   @TestTemplate
   def testAggregationWithoutWatermark(): Unit = {
-    // NOTE: Different from AggregateITCase, we do not set stream time 
characteristic
-    // of environment to event time, so that emitWatermark() actually does 
nothing.
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
     val data = new mutable.MutableList[(Int, Int)]
     data.+=((1, 1))
     data.+=((2, 2))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
index 1a775b511bd..e55fa33070b 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/JoinITCase.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.scala._
 import org.apache.flink.table.api.config.ExecutionConfigOptions
@@ -1378,10 +1377,6 @@ class JoinITCase(miniBatch: MiniBatchMode, state: 
StateBackendMode, enableAsyncS
 
   @TestTemplate
   def testJoinWithoutWatermark(): Unit = {
-    // NOTE: Different from AggregateITCase, we do not set stream time 
characteristic
-    // of environment to event time, so that emitWatermark() actually does 
nothing.
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
-
     val data1 = new mutable.MutableList[(Int, Long)]
     data1.+=((1, 1L))
     data1.+=((2, 2L))
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
index a8fe68f1d78..07343f4af68 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/runtime/stream/sql/TemporalTableFunctionJoinITCase.scala
@@ -17,7 +17,6 @@
  */
 package org.apache.flink.table.planner.runtime.stream.sql
 
-import org.apache.flink.streaming.api.TimeCharacteristic
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
 import 
org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
 import org.apache.flink.table.api._
@@ -50,7 +49,6 @@ class TemporalTableFunctionJoinITCase(state: StateBackendMode)
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     env.setParallelism(1)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val sqlQuery =
       """
@@ -99,7 +97,6 @@ class TemporalTableFunctionJoinITCase(state: StateBackendMode)
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     env.setParallelism(1)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
     val result = tEnv
       .sqlQuery(
         "SELECT amount, currency, proctime() as proctime " +
@@ -114,7 +111,6 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     env.setParallelism(1)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime)
 
     val sqlQuery =
       """
@@ -170,7 +166,6 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
     env.setParallelism(1)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     val sqlQuery =
       """
@@ -234,7 +229,6 @@ class TemporalTableFunctionJoinITCase(state: 
StateBackendMode)
   def testNestedTemporalJoin(): Unit = {
     val env = StreamExecutionEnvironment.getExecutionEnvironment
     val tEnv = StreamTableEnvironment.create(env, TableTestUtil.STREAM_SETTING)
-    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
 
     val sqlQuery =
       """
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
index 8aa8165012d..832ab36255a 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/utils/TableTestBase.scala
@@ -27,8 +27,8 @@ import 
org.apache.flink.legacy.table.sinks.{AppendStreamTableSink, RetractStream
 import org.apache.flink.legacy.table.sources.StreamTableSource
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.core.JsonParseException
 import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.databind.JsonNode
-import org.apache.flink.streaming.api.{environment, TimeCharacteristic}
 import org.apache.flink.streaming.api.datastream.DataStream
+import org.apache.flink.streaming.api.environment
 import org.apache.flink.streaming.api.environment.{LocalStreamEnvironment, 
StreamExecutionEnvironment}
 import org.apache.flink.table.api._
 import org.apache.flink.table.api.bridge.java.{StreamTableEnvironment => 
JavaStreamTableEnv}
@@ -1717,17 +1717,6 @@ object TableTestUtil {
       .map(
         (f: Array[Expression]) => {
           val fieldsInfo = FieldInfoUtils.getFieldsInfo(streamType, f)
-          // check if event-time is enabled
-          if (
-            fieldsInfo.isRowtimeDefined &&
-            (execEnv.getStreamTimeCharacteristic ne 
TimeCharacteristic.EventTime)
-          ) {
-            throw new ValidationException(
-              String.format(
-                "A rowtime attribute requires an EventTime time characteristic 
in stream " +
-                  "environment. But is: %s",
-                execEnv.getStreamTimeCharacteristic))
-          }
           fieldsInfo
         })
       .getOrElse(FieldInfoUtils.getFieldsInfo(streamType))
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
index 1f20ea83b0a..7853915e9eb 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/StreamTaskTimerITCase.java
@@ -21,7 +21,6 @@ package org.apache.flink.test.streaming.runtime;
 import 
org.apache.flink.api.common.operators.ProcessingTimeService.ProcessingTimeCallback;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.runtime.client.JobExecutionException;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.source.legacy.SourceFunction;
@@ -36,11 +35,7 @@ import org.apache.flink.util.ExceptionUtils;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
 
-import java.util.Arrays;
-import java.util.Collection;
 import java.util.Optional;
 import java.util.concurrent.Semaphore;
 
@@ -52,15 +47,8 @@ import static org.junit.Assert.assertTrue;
  * <p>These tests ensure that exceptions are properly forwarded from the timer 
thread to the task
  * thread and that operator methods are not invoked concurrently.
  */
-@RunWith(Parameterized.class)
 public class StreamTaskTimerITCase extends AbstractTestBaseJUnit4 {
 
-    private final TimeCharacteristic timeCharacteristic;
-
-    public StreamTaskTimerITCase(TimeCharacteristic characteristic) {
-        timeCharacteristic = characteristic;
-    }
-
     /**
      * Note: this test fails if we don't check for exceptions in the source 
contexts and do not
      * synchronize in the source contexts.
@@ -69,7 +57,6 @@ public class StreamTaskTimerITCase extends 
AbstractTestBaseJUnit4 {
     public void testOperatorChainedToSource() throws Exception {
 
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(timeCharacteristic);
         env.setParallelism(1);
 
         DataStream<String> source = env.addSource(new InfiniteTestSource());
@@ -106,7 +93,6 @@ public class StreamTaskTimerITCase extends 
AbstractTestBaseJUnit4 {
     @Test
     public void testOneInputOperatorWithoutChaining() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(timeCharacteristic);
         env.setParallelism(1);
 
         DataStream<String> source = env.addSource(new InfiniteTestSource());
@@ -123,7 +109,6 @@ public class StreamTaskTimerITCase extends 
AbstractTestBaseJUnit4 {
     @Test
     public void testTwoInputOperatorWithoutChaining() throws Exception {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-        env.setStreamTimeCharacteristic(timeCharacteristic);
         env.setParallelism(1);
 
         DataStream<String> source = env.addSource(new InfiniteTestSource());
@@ -290,16 +275,4 @@ public class StreamTaskTimerITCase extends 
AbstractTestBaseJUnit4 {
             running = false;
         }
     }
-
-    // ------------------------------------------------------------------------
-    //  parametrization
-    // ------------------------------------------------------------------------
-
-    @Parameterized.Parameters(name = "Time Characteristic = {0}")
-    public static Collection<Object[]> executionModes() {
-        return Arrays.asList(
-                new Object[] {TimeCharacteristic.ProcessingTime},
-                new Object[] {TimeCharacteristic.IngestionTime},
-                new Object[] {TimeCharacteristic.EventTime});
-    }
 }
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
index cb4799343f9..14238407b46 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/TimestampITCase.java
@@ -42,7 +42,6 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.client.JobStatusMessage;
 import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
-import org.apache.flink.streaming.api.TimeCharacteristic;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.streaming.api.functions.co.CoMapFunction;
@@ -666,38 +665,11 @@ public class TimestampITCase extends TestLogger {
                 CustomOperator.finalWatermarks[0].get(0).getTimestamp() == 
Long.MAX_VALUE);
     }
 
-    /**
-     * This verifies that an event time source works when setting stream time 
characteristic to
-     * processing time. In this case, the watermarks should just be swallowed 
apart from the last
-     * final watermark marking the end of time.
-     */
-    @Test
-    public void testEventTimeSourceWithProcessingTime() throws Exception {
-        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
-
-        env.setParallelism(2);
-        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
-
-        DataStream<Integer> source1 = env.addSource(new MyTimestampSource(0, 
10));
-
-        source1.map(new IdentityMap())
-                .transform(
-                        "Watermark Check", BasicTypeInfo.INT_TYPE_INFO, new 
CustomOperator(false));
-
-        env.execute();
-
-        // verify that we don't get any watermarks, the source is used as 
watermark source in
-        // other tests, so it normally emits watermarks
-        Assert.assertTrue(CustomOperator.finalWatermarks[0].size() == 1);
-        Assert.assertEquals(Watermark.MAX_WATERMARK, 
CustomOperator.finalWatermarks[0].get(0));
-    }
-
     @Test
     public void testErrorOnEventTimeOverProcessingTime() {
         StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
 
         env.setParallelism(2);
-        env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
 
         DataStream<Tuple2<String, Integer>> source1 =
                 env.fromData(new Tuple2<>("a", 1), new Tuple2<>("b", 2));

Reply via email to