gaoyunhaii commented on code in PR #20147:
URL: https://github.com/apache/flink/pull/20147#discussion_r935436250


##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##########
@@ -880,6 +889,125 @@ public void 
testAutoParallelismForExpandedTransformations() {
                         });
     }
 
+    @Test(expected = RuntimeException.class)
+    public void testCacheInStreamModeThrowException() {

Review Comment:
   nit: Throw -> Throws



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SideOutputDataStream.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
+
+/**
+ * A {@link SideOutputDataStream} represents a {@link DataStream} that 
contains elements that are
+ * emitted from upstream into a side output with some tag.
+ *
+ * @param <T> The type of the elements in this stream.
+ */
+@Public
+public class SideOutputDataStream<T> extends DataStream<T> {
+    /**
+     * Create a new {@link SideOutputDataStream} in the given execution 
environment.

Review Comment:
   nit: Create -> Creates



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SingleOutputStreamOperator.java:
##########
@@ -437,4 +437,22 @@ public SingleOutputStreamOperator<T> setDescription(String 
description) {
         transformation.setDescription(description);
         return this;
     }
+
+    /**
+     * Cache the intermediate result of the transformation. Only support 
bounded streams and
+     * currently only block mode is supported. The cache is generated lazily 
at the first time the
+     * intermediate result is computed. The cache will be clear when {@link
+     * CachedDataStream#invalidate()} called or the {@link 
StreamExecutionEnvironment} close.
+     *
+     * @return CachedDataStream that can use in later job to reuse the cached 
intermediate result.
+     */
+    @PublicEvolving
+    public CachedDataStream<T> cache() {
+        if (!(this.transformation instanceof PhysicalTransformation)) {
+            throw new IllegalStateException(
+                    "Cache can only be called with physical transformation");

Review Comment:
   The message here might still be "... with physical or side output 
transformation"



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/transformations/CacheTransformation.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.transformations;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.dag.Transformation;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.util.AbstractID;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.Lists;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * When in batch mode, the {@link CacheTransformation} represents the 
intermediate result of the
+ * upper stream should be cached when it is computed at the first time. And it 
consumes the cached
+ * intermediate result in later jobs. In stream mode, it has no affect.
+ *
+ * @param <T> The type of the elements in the cache intermediate result.
+ */
+@Internal
+public class CacheTransformation<T> extends Transformation<T> {
+    private final Transformation<T> transformationToCache;
+    private final AbstractID datasetId;
+    private boolean isCached;
+    /**
+     * Creates a new {@code Transformation} with the given name, output type 
and parallelism.
+     *
+     * @param name The name of the {@code Transformation}, this will be shown 
in Visualizations and
+     *     the Log
+     */
+    public CacheTransformation(Transformation<T> transformationToCache, String 
name) {
+        super(name, transformationToCache.getOutputType(), 
transformationToCache.getParallelism());
+        this.transformationToCache = transformationToCache;
+
+        this.datasetId = new IntermediateDataSetID();

Review Comment:
   `new AbstractID()` ?



##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/datastream/SideOutputDataStream.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.datastream;
+
+import org.apache.flink.annotation.Public;
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.transformations.SideOutputTransformation;
+
+/**
+ * A {@link SideOutputDataStream} represents a {@link DataStream} that 
contains elements that are
+ * emitted from upstream into a side output with some tag.
+ *
+ * @param <T> The type of the elements in this stream.
+ */
+@Public
+public class SideOutputDataStream<T> extends DataStream<T> {
+    /**
+     * Create a new {@link SideOutputDataStream} in the given execution 
environment.
+     *
+     * @param environment The StreamExecutionEnvironment
+     * @param transformation The SideOutputTransformation
+     */
+    public SideOutputDataStream(
+            StreamExecutionEnvironment environment, 
SideOutputTransformation<T> transformation) {
+        super(environment, transformation);
+    }
+
+    /**
+     * Cache the intermediate result of the transformation. Only support 
bounded streams and

Review Comment:
   nit: Cache -> Caches



##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/CacheITCase.java:
##########
@@ -0,0 +1,331 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.test.streaming.runtime;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.api.common.eventtime.WatermarkStrategy;
+import org.apache.flink.api.common.functions.FlatMapFunction;
+import org.apache.flink.api.common.serialization.SimpleStringEncoder;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.file.sink.FileSink;
+import org.apache.flink.connector.file.src.FileSource;
+import org.apache.flink.connector.file.src.reader.TextLineInputFormat;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.minicluster.RpcServiceSharing;
+import org.apache.flink.runtime.scheduler.ClusterDatasetCorruptedException;
+import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
+import org.apache.flink.streaming.api.datastream.CachedDataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.ProcessFunction;
+import org.apache.flink.streaming.api.transformations.CacheTransformation;
+import org.apache.flink.streaming.util.TestStreamEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.CloseableIterator;
+import org.apache.flink.util.CollectionUtil;
+import org.apache.flink.util.Collector;
+import org.apache.flink.util.OutputTag;
+
+import org.apache.commons.io.FileUtils;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.UUID;
+
+import static org.hamcrest.MatcherAssert.assertThat;

Review Comment:
   We should use the new junit 5 + assertJ 



##########
flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamGraphGeneratorTest.java:
##########
@@ -880,6 +889,125 @@ public void 
testAutoParallelismForExpandedTransformations() {
                         });
     }
 
+    @Test(expected = RuntimeException.class)
+    public void testCacheInStreamModeThrowException() {
+        final TestingStreamExecutionEnvironment env = new 
TestingStreamExecutionEnvironment();
+        env.setRuntimeMode(RuntimeExecutionMode.STREAMING);
+
+        DataStream<Integer> source = env.fromElements(1, 2, 3);
+        final int upstreamParallelism = 3;
+        CachedDataStream<Integer> cachedStream =
+                source.keyBy(i -> i)
+                        .reduce(Integer::sum)
+                        .setParallelism(upstreamParallelism)
+                        .cache();
+        cachedStream.print();
+        final StreamGraph streamGraph = env.getStreamGraph();

Review Comment:
   Might not need to assign the result to streamGraph



##########
flink-streaming-scala/src/main/scala/org/apache/flink/streaming/api/scala/CachedDataStream.scala:
##########
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.flink.streaming.api.scala
+
+import org.apache.flink.annotation.PublicEvolving
+import org.apache.flink.streaming.api.datastream.{CachedDataStream => 
JavaCachedDataStream}
+
+@PublicEvolving
+class CachedDataStream[T](javaStream: JavaCachedDataStream[T])

Review Comment:
   Do we also need a wrapper for the new SideoutputStream ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to