This is an automated email from the ASF dual-hosted git repository.
vinoyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new c4bbcb7 [HUDI-1621] Gets the parallelism from context when init
StreamWriteOperatorCoordinator (#2579)
c4bbcb7 is described below
commit c4bbcb7f0e83732b63ed7bb81c20470ae8d9a0dc
Author: lamber-ken <[email protected]>
AuthorDate: Wed Feb 17 20:04:38 2021 +0800
[HUDI-1621] Gets the parallelism from context when init
StreamWriteOperatorCoordinator (#2579)
---
.../org/apache/hudi/operator/StreamWriteOperatorCoordinator.java | 8 ++++----
.../java/org/apache/hudi/operator/StreamWriteOperatorFactory.java | 7 ++-----
.../main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java | 2 +-
.../src/test/java/org/apache/hudi/operator/StreamWriteITCase.java | 2 +-
hudi-flink/src/test/resources/log4j-surefire.properties | 4 ++--
5 files changed, 10 insertions(+), 13 deletions(-)
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
index bd933c2..787c490 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorCoordinator.java
@@ -398,20 +398,20 @@ public class StreamWriteOperatorCoordinator
public static class Provider implements OperatorCoordinator.Provider {
private final OperatorID operatorId;
private final Configuration conf;
- private final int numTasks;
- public Provider(OperatorID operatorId, Configuration conf, int numTasks) {
+ public Provider(OperatorID operatorId, Configuration conf) {
this.operatorId = operatorId;
this.conf = conf;
- this.numTasks = numTasks;
}
+ @Override
public OperatorID getOperatorId() {
return this.operatorId;
}
+ @Override
public OperatorCoordinator create(Context context) {
- return new StreamWriteOperatorCoordinator(this.conf, this.numTasks);
+ return new StreamWriteOperatorCoordinator(this.conf,
context.currentParallelism());
}
}
}
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
index 5626745..a558ffd 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/operator/StreamWriteOperatorFactory.java
@@ -39,15 +39,12 @@ public class StreamWriteOperatorFactory<I>
private final StreamWriteOperator<I> operator;
private final Configuration conf;
- private final int numTasks;
public StreamWriteOperatorFactory(
- Configuration conf,
- int numTasks) {
+ Configuration conf) {
super(new StreamWriteOperator<>(conf));
this.operator = (StreamWriteOperator<I>) getOperator();
this.conf = conf;
- this.numTasks = numTasks;
}
@Override
@@ -65,7 +62,7 @@ public class StreamWriteOperatorFactory<I>
@Override
public OperatorCoordinator.Provider getCoordinatorProvider(String s,
OperatorID operatorID) {
- return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf,
this.numTasks);
+ return new StreamWriteOperatorCoordinator.Provider(operatorID, this.conf);
}
@Override
diff --git
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
index 24b8994..27fd4f5 100644
---
a/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
+++
b/hudi-flink/src/main/java/org/apache/hudi/streamer/HoodieFlinkStreamerV2.java
@@ -74,7 +74,7 @@ public class HoodieFlinkStreamerV2 {
Configuration conf = FlinkOptions.fromStreamerConfig(cfg);
int numWriteTask = conf.getInteger(FlinkOptions.WRITE_TASK_PARALLELISM);
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
- new StreamWriteOperatorFactory<>(conf, numWriteTask);
+ new StreamWriteOperatorFactory<>(conf);
DataStream<Object> dataStream = env.addSource(new FlinkKafkaConsumer<>(
cfg.kafkaTopic,
diff --git
a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
index be8ec36..4297386 100644
--- a/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
+++ b/hudi-flink/src/test/java/org/apache/hudi/operator/StreamWriteITCase.java
@@ -93,7 +93,7 @@ public class StreamWriteITCase extends TestLogger {
(RowType)
AvroSchemaConverter.convertToDataType(StreamerUtil.getSourceSchema(conf))
.getLogicalType();
StreamWriteOperatorFactory<HoodieRecord> operatorFactory =
- new StreamWriteOperatorFactory<>(conf, 4);
+ new StreamWriteOperatorFactory<>(conf);
JsonRowDataDeserializationSchema deserializationSchema = new
JsonRowDataDeserializationSchema(
rowType,
diff --git a/hudi-flink/src/test/resources/log4j-surefire.properties
b/hudi-flink/src/test/resources/log4j-surefire.properties
index 32af462..8dcd17f 100644
--- a/hudi-flink/src/test/resources/log4j-surefire.properties
+++ b/hudi-flink/src/test/resources/log4j-surefire.properties
@@ -15,7 +15,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
###
-log4j.rootLogger=WARN, CONSOLE
+log4j.rootLogger=INFO, CONSOLE
log4j.logger.org.apache=INFO
log4j.logger.org.apache.hudi=DEBUG
log4j.logger.org.apache.hadoop.hbase=ERROR
@@ -27,5 +27,5 @@ log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
log4j.appender.CONSOLE.layout.ConversionPattern=%-4r [%t] %-5p %c %x - %m%n
log4j.appender.CONSOLE.filter.a=org.apache.log4j.varia.LevelRangeFilter
log4j.appender.CONSOLE.filter.a.AcceptOnMatch=true
-log4j.appender.CONSOLE.filter.a.LevelMin=WARN
+log4j.appender.CONSOLE.filter.a.LevelMin=INFO
log4j.appender.CONSOLE.filter.a.LevelMax=FATAL