This is an automated email from the ASF dual-hosted git repository.

vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new c4bbcb7  [HUDI-1621] Gets the parallelism from context when init 
StreamWriteOperatorCoordinator (#2579)
c4bbcb7 is described below

commit c4bbcb7f0e83732b63ed7bb81c20470ae8d9a0dc
Author: lamber-ken <[email protected]>
AuthorDate: Wed Feb 17 20:04:38 2021 +0800

    [HUDI-1621] Gets the parallelism from context when init 
StreamWriteOperatorCoordinator (#2579)
---
 .../org/apache/hudi/operator/StreamWriteOperatorCoordinator.java  | 8 ++++----
 .../java/org/apache/hudi/operator/StreamWriteOperatorFactory.java | 7 ++-----
 .../main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java | 2 +-
 .../src/test/java/org/apache/hudi/operator/StreamWriteITCase.java | 2 +-
 hudi-flink/src/test/resources/log4j-surefire.properties           | 4 ++--
 5 files changed, 10 insertions(+), 13 deletions(-)

diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
index bd933c2..787c490 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
@@ -398,20 +398,20 @@ public class StreamWriteOperatorCoordinator
   public static class Provider implements OperatorCoordinator.Provider {
     private final OperatorID operatorId;
     private final Configuration conf;
-    private final int numTasks;
 
-    public Provider(OperatorID operatorId, Configuration conf, int numTasks) {
+    public Provider(OperatorID operatorId, Configuration conf) {
       this.operatorId = operatorId;
       this.conf = conf;
-      this.numTasks = numTasks;
     }
 
+    @Override
     public OperatorID getOperatorId() {
       return this.operatorId;
     }
 
+    @Override
     public OperatorCoordinator create(Context context) {
-      return new StreamWriteOperatorCoordinator(this.conf, this.numTasks);
+      return new StreamWriteOperatorCoordinator(this.conf, 
context.currentParallelism());
     }
   }
 }
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
index 5626745..a558ffd 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
@@ -39,15 +39,12 @@ public class StreamWriteOperatorFactory<I>
 
   private final StreamWriteOperator<I> operator;
   private final Configuration conf;
-  private final int numTasks;
 
   public StreamWriteOperatorFactory(
-      Configuration conf,
-      int numTasks) {
+      Configuration conf) {
     super(new StreamWriteOperator<>(conf));
     this.operator = (StreamWriteOperator<I>) getOperator();
     this.conf = conf;
-    this.numTasks = numTasks;
   }
 
   @Override
@@ -65,7 +62,7 @@ public class StreamWriteOperatorFactory<I>
 
   @Override
   public OperatorCoordinator.Provider getCoordinatorProvider(String s, 
OperatorID operatorID) {
-    return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf, 
this.numTasks);
+    return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
   }
 
   @Override
diff --git 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
index 24b8994..27fd4f5 100644
--- 
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
+++ 
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
@@ -74,7 +74,7 @@ public class HoodieFlinkStreamerV2 {
     Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
     int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
     StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
-        new StreamWriteOperatorFactory<>(conf, numWriteTask);
+        new StreamWriteOperatorFactory<>(conf);
 
     DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
         cfg.kafkaTopic,
diff --git 
a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java 
b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
index be8ec36..4297386 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -93,7 +93,7 @@ public class StreamWriteITCase extends TestLogger {
         (RowType) 
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
             .getLogicalType();
     StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
-        new StreamWriteOperatorFactory<>(conf, 4);
+        new StreamWriteOperatorFactory<>(conf);
 
     JsonRowDataDeserializationSchema deserializationSchema = new 
JsonRowDataDeserializationSchema(
         rowType,
diff --git a/hudi-flink/src/test/resources/log4j-surefire.properties 
b/hudi-flink/src/test/resources/log4j-surefire.properties
index 32af462..8dcd17f 100644
--- a/hudi-flink/src/test/resources/log4j-surefire.properties
+++ b/hudi-flink/src/test/resources/log4j-surefire.properties
@@ -15,7 +15,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 ###
-log4j.rootLogger=WARN, CONSOLE
+log4j.rootLogger=INFO, CONSOLE
 log4j.logger.org.apache=INFO
 log4j.logger.org.apache.hudi=DEBUG
 log4j.logger.org.apache.hadoop.hbase=ERROR
@@ -27,5 +27,5 @@ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
 log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
 log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
 log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
-log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMin=INFO
 log4j.appender.CONSOLE.filter.a.LevelMax=FATAL

Reply via email to