This is an automated email from the ASF dual-hosted git repository.
rmetzger pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 3e89818 Revert "[FLINK-19850] Add e2e tests for the new FileSink in
streaming mode"
3e89818 is described below
commit 3e89818414728a4c90b7335d2fd2566477b9274a
Author: Robert Metzger <[email protected]>
AuthorDate: Fri Nov 6 18:36:01 2020 +0100
Revert "[FLINK-19850] Add e2e tests for the new FileSink in streaming mode"
This reverts commit dfd2a55065e228b973f9e2343b6252ca308e5398.
---
.../pom.xml | 16 +++-----
.../src/main/java/StreamingFileSinkProgram.java} | 46 +++++++---------------
flink-end-to-end-tests/pom.xml | 2 +-
flink-end-to-end-tests/run-nightly-tests.sh | 7 +---
...st_file_sink.sh => test_streaming_file_sink.sh} | 12 +++---
5 files changed, 28 insertions(+), 55 deletions(-)
diff --git a/flink-end-to-end-tests/flink-file-sink-test/pom.xml
b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
similarity index 83%
rename from flink-end-to-end-tests/flink-file-sink-test/pom.xml
rename to flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
index d14a5f8..6d309e2 100644
--- a/flink-end-to-end-tests/flink-file-sink-test/pom.xml
+++ b/flink-end-to-end-tests/flink-streaming-file-sink-test/pom.xml
@@ -27,8 +27,8 @@ under the License.
<modelVersion>4.0.0</modelVersion>
- <artifactId>flink-file-sink-test</artifactId>
- <name>Flink : E2E Tests : File sink</name>
+ <artifactId>flink-streaming-file-sink-test</artifactId>
+ <name>Flink : E2E Tests : Streaming file sink</name>
<dependencies>
<dependency>
@@ -37,12 +37,6 @@ under the License.
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-connector-files</artifactId>
- <version>${project.version}</version>
- </dependency>
</dependencies>
<build>
@@ -52,16 +46,16 @@ under the License.
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
- <id>FileSinkTestProgram</id>
+
<id>StreamingFileSinkSinkTestProgram</id>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
-
<finalName>FileSinkProgram</finalName>
+
<finalName>StreamingFileSinkProgram</finalName>
<transformers>
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-
<mainClass>FileSinkProgram</mainClass>
+
<mainClass>StreamingFileSinkProgram</mainClass>
</transformer>
</transformers>
</configuration>
diff --git
a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/FileSinkProgram.java
b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
similarity index 79%
rename from
flink-end-to-end-tests/flink-file-sink-test/src/main/java/FileSinkProgram.java
rename to
flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
index 62b0edc..c9cf6d7 100644
---
a/flink-end-to-end-tests/flink-file-sink-test/src/main/java/FileSinkProgram.java
+++
b/flink-end-to-end-tests/flink-streaming-file-sink-test/src/main/java/StreamingFileSinkProgram.java
@@ -24,13 +24,11 @@ import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.common.typeutils.base.IntSerializer;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
-import org.apache.flink.connector.file.sink.FileSink;
import org.apache.flink.core.fs.Path;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.runtime.state.FunctionInitializationContext;
import org.apache.flink.runtime.state.FunctionSnapshotContext;
import org.apache.flink.streaming.api.checkpoint.CheckpointedFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import
org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
@@ -42,7 +40,7 @@ import java.io.PrintStream;
import java.util.concurrent.TimeUnit;
/**
- * Test program for the {@link StreamingFileSink} and {@link FileSink}.
+ * Test program for the {@link StreamingFileSink}.
*
* <p>Uses a source that steadily emits a deterministic set of records over 60
seconds,
* after which it idles and waits for job cancellation. Every record has a
unique index that is
@@ -52,13 +50,12 @@ import java.util.concurrent.TimeUnit;
* Adding all committed part files together, and numerically sorting the
contents, should
* result in a complete sequence from 0 (inclusive) to 60000 (exclusive).
*/
-public enum FileSinkProgram {
+public enum StreamingFileSinkProgram {
;
public static void main(final String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final String outputPath = params.getRequired("outputPath");
- final String sinkToTest = params.getRequired("sinkToTest");
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
@@ -66,37 +63,24 @@ public enum FileSinkProgram {
env.enableCheckpointing(5000L);
env.setRestartStrategy(RestartStrategies.fixedDelayRestart(Integer.MAX_VALUE,
Time.of(10L, TimeUnit.SECONDS)));
+ final StreamingFileSink<Tuple2<Integer, Integer>> sink =
StreamingFileSink
+ .forRowFormat(new Path(outputPath),
(Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
+ PrintStream out = new PrintStream(stream);
+ out.println(element.f1);
+ })
+ .withBucketAssigner(new KeyBucketAssigner())
+ .withRollingPolicy(OnCheckpointRollingPolicy.build())
+ .build();
+
// generate data, shuffle, sink
- DataStream<Tuple2<Integer, Integer>> source = env.addSource(new
Generator(10, 10, 60));
-
- if (sinkToTest.equalsIgnoreCase("StreamingFileSink")) {
- final StreamingFileSink<Tuple2<Integer, Integer>> sink
= StreamingFileSink
- .forRowFormat(new Path(outputPath),
(Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
- PrintStream out = new
PrintStream(stream);
- out.println(element.f1);
- })
- .withBucketAssigner(new
KeyBucketAssigner())
-
.withRollingPolicy(OnCheckpointRollingPolicy.build())
- .build();
-
- source.keyBy(0).addSink(sink);
- } else if (sinkToTest.equalsIgnoreCase("FileSink")){
- FileSink<Tuple2<Integer, Integer>> sink = FileSink
- .forRowFormat(new Path(outputPath),
(Encoder<Tuple2<Integer, Integer>>) (element, stream) -> {
- PrintStream out = new
PrintStream(stream);
- out.println(element.f1);
- })
- .withBucketAssigner(new
KeyBucketAssigner())
-
.withRollingPolicy(OnCheckpointRollingPolicy.build())
- .build();
- source.keyBy(0).sinkTo(sink);
- } else {
- throw new UnsupportedOperationException("Unsupported
sink type: " + sinkToTest);
- }
+ env.addSource(new Generator(10, 10, 60))
+ .keyBy(0)
+ .addSink(sink);
env.execute("StreamingFileSinkProgram");
}
+
/**
* Use first field for buckets.
*/
diff --git a/flink-end-to-end-tests/pom.xml b/flink-end-to-end-tests/pom.xml
index 2b6e1be..1eb78e5 100644
--- a/flink-end-to-end-tests/pom.xml
+++ b/flink-end-to-end-tests/pom.xml
@@ -68,7 +68,7 @@ under the License.
<module>flink-confluent-schema-registry</module>
<module>flink-stream-state-ttl-test</module>
<module>flink-sql-client-test</module>
- <module>flink-file-sink-test</module>
+ <module>flink-streaming-file-sink-test</module>
<module>flink-state-evolution-test</module>
<module>flink-rocksdb-state-memory-control-test</module>
<module>flink-end-to-end-tests-common</module>
diff --git a/flink-end-to-end-tests/run-nightly-tests.sh
b/flink-end-to-end-tests/run-nightly-tests.sh
index d3a318a..fa4e656 100755
--- a/flink-end-to-end-tests/run-nightly-tests.sh
+++ b/flink-end-to-end-tests/run-nightly-tests.sh
@@ -185,11 +185,8 @@ run_test "Batch SQL end-to-end test"
"$END_TO_END_DIR/test-scripts/test_batch_sq
run_test "Streaming SQL end-to-end test (Old planner)"
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh old" "skip_check_exceptions"
run_test "Streaming SQL end-to-end test (Blink planner)"
"$END_TO_END_DIR/test-scripts/test_streaming_sql.sh blink"
"skip_check_exceptions"
-run_test "Streaming File Sink end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh local
StreamingFileSink" "skip_check_exceptions"
-run_test "Streaming File Sink s3 end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3 StreamingFileSink"
"skip_check_exceptions"
-run_test "New File Sink end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh local FileSink"
"skip_check_exceptions"
-run_test "New File Sink s3 end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3 FileSink"
"skip_check_exceptions"
-
+run_test "Streaming File Sink end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh"
"skip_check_exceptions"
+run_test "Streaming File Sink s3 end-to-end test"
"$END_TO_END_DIR/test-scripts/test_streaming_file_sink.sh s3"
"skip_check_exceptions"
run_test "Stateful stream job upgrade end-to-end test"
"$END_TO_END_DIR/test-scripts/test_stateful_stream_job_upgrade.sh 2 4"
run_test "Netty shuffle direct memory consumption end-to-end test"
"$END_TO_END_DIR/test-scripts/test_netty_shuffle_memory_control.sh"
diff --git a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
similarity index 94%
rename from flink-end-to-end-tests/test-scripts/test_file_sink.sh
rename to flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
index 5965c07..0880f62 100755
--- a/flink-end-to-end-tests/test-scripts/test_file_sink.sh
+++ b/flink-end-to-end-tests/test-scripts/test_streaming_file_sink.sh
@@ -18,9 +18,8 @@
################################################################################
OUT_TYPE="${1:-local}" # other type: s3
-SINK_TO_TEST="${2:-"StreamingFileSink"}"
-S3_PREFIX=temp/test_file_sink-$(uuidgen)
+S3_PREFIX=temp/test_streaming_file_sink-$(uuidgen)
OUTPUT_PATH="$TEST_DATA_DIR/$S3_PREFIX"
S3_OUTPUT_PATH="s3://$IT_CASE_S3_BUCKET/$S3_PREFIX"
source "$(dirname "$0")"/common.sh
@@ -61,7 +60,7 @@ if [ "${OUT_TYPE}" == "s3" ]; then
on_exit out_cleanup
fi
-TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-file-sink-test/target/FileSinkProgram.jar"
+TEST_PROGRAM_JAR="${END_TO_END_DIR}/flink-streaming-file-sink-test/target/StreamingFileSinkProgram.jar"
###################################
# Get all lines in part files and sort them numerically.
@@ -136,7 +135,7 @@ function wait_for_complete_result {
done
}
-function run_file_sink_test {
+function run_streaming_file_sink_test {
start_cluster
"${FLINK_DIR}/bin/taskmanager.sh" start
@@ -144,8 +143,7 @@ function run_file_sink_test {
"${FLINK_DIR}/bin/taskmanager.sh" start
echo "Submitting job."
- CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}"
--outputPath "${JOB_OUTPUT_PATH}" \
- --sinkToTest "${SINK_TO_TEST}")
+ CLIENT_OUTPUT=$("$FLINK_DIR/bin/flink" run -d "${TEST_PROGRAM_JAR}"
--outputPath "${JOB_OUTPUT_PATH}")
JOB_ID=$(echo "${CLIENT_OUTPUT}" | grep "Job has been submitted with JobID"
| sed 's/.* //g')
if [[ -z $JOB_ID ]]; then
@@ -189,4 +187,4 @@ function run_file_sink_test {
}
# usual runtime is ~6 minutes
-run_test_with_timeout 900 run_file_sink_test
+run_test_with_timeout 900 run_streaming_file_sink_test