This is an automated email from the ASF dual-hosted git repository.

openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 37ded38  Flink:  Add job name for RewriteDataFilesAction (#2197)
37ded38 is described below

commit 37ded38b7eba64e930bb6e56a86b5b96294c6501
Author: JunZhang <[email protected]>
AuthorDate: Thu Feb 4 16:10:14 2021 +0800

    Flink:  Add job name for RewriteDataFilesAction (#2197)
    
    Co-authored-by: zhangjun <[email protected]>
---
 .../org/apache/iceberg/flink/actions/RewriteDataFilesAction.java   | 6 +++++-
 .../main/java/org/apache/iceberg/flink/source/RowDataRewriter.java | 7 ++++---
 2 files changed, 9 insertions(+), 4 deletions(-)

diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
 
b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
index 6087699..291be0c 100644
--- 
a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
+++ 
b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
@@ -52,7 +52,11 @@ public class RewriteDataFilesAction extends 
BaseRewriteDataFilesAction<RewriteDa
     int parallelism = Math.min(size, maxParallelism);
     DataStream<CombinedScanTask> dataStream = 
env.fromCollection(combinedScanTasks);
     RowDataRewriter rowDataRewriter = new RowDataRewriter(table(), 
caseSensitive(), fileIO(), encryptionManager());
-    return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
+    try {
+      return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
+    } catch (Exception e) {
+      throw new RuntimeException("Rewrite data file error.", e);
+    }
   }
 
   @Override
diff --git 
a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java 
b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 8b4986d..b26ecf0 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -26,7 +26,6 @@ import java.util.stream.Collectors;
 import org.apache.flink.api.common.functions.RichMapFunction;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.iceberg.CombinedScanTask;
@@ -59,6 +58,7 @@ public class RowDataRewriter {
   private final boolean caseSensitive;
   private final EncryptionManager encryptionManager;
   private final TaskWriterFactory<RowData> taskWriterFactory;
+  private final String tableName;
 
   public RowDataRewriter(Table table, boolean caseSensitive, FileIO io, 
EncryptionManager encryptionManager) {
     this.schema = table.schema();
@@ -66,6 +66,7 @@ public class RowDataRewriter {
     this.io = io;
     this.encryptionManager = encryptionManager;
     this.nameMapping = PropertyUtil.propertyAsString(table.properties(), 
DEFAULT_NAME_MAPPING, null);
+    this.tableName = table.name();
 
     String formatString = PropertyUtil.propertyAsString(table.properties(), 
TableProperties.DEFAULT_FILE_FORMAT,
         TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
@@ -84,10 +85,10 @@ public class RowDataRewriter {
         null);
   }
 
-  public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> 
dataStream, int parallelism) {
+  public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask> 
dataStream, int parallelism) throws Exception {
     RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive, 
encryptionManager, taskWriterFactory);
     DataStream<List<DataFile>> ds = 
dataStream.map(map).setParallelism(parallelism);
-    return 
Lists.newArrayList(DataStreamUtils.collect(ds)).stream().flatMap(Collection::stream)
+    return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" + 
tableName)).stream().flatMap(Collection::stream)
         .collect(Collectors.toList());
   }
 

Reply via email to