This is an automated email from the ASF dual-hosted git repository.
openinx pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new 37ded38 Flink: Add job name for RewriteDataFilesAction (#2197)
37ded38 is described below
commit 37ded38b7eba64e930bb6e56a86b5b96294c6501
Author: JunZhang <[email protected]>
AuthorDate: Thu Feb 4 16:10:14 2021 +0800
Flink: Add job name for RewriteDataFilesAction (#2197)
Co-authored-by: zhangjun <[email protected]>
---
.../org/apache/iceberg/flink/actions/RewriteDataFilesAction.java | 6 +++++-
.../main/java/org/apache/iceberg/flink/source/RowDataRewriter.java | 7 ++++---
2 files changed, 9 insertions(+), 4 deletions(-)
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
index 6087699..291be0c 100644
---
a/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
+++
b/flink/src/main/java/org/apache/iceberg/flink/actions/RewriteDataFilesAction.java
@@ -52,7 +52,11 @@ public class RewriteDataFilesAction extends
BaseRewriteDataFilesAction<RewriteDa
int parallelism = Math.min(size, maxParallelism);
DataStream<CombinedScanTask> dataStream =
env.fromCollection(combinedScanTasks);
RowDataRewriter rowDataRewriter = new RowDataRewriter(table(),
caseSensitive(), fileIO(), encryptionManager());
- return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
+ try {
+ return rowDataRewriter.rewriteDataForTasks(dataStream, parallelism);
+ } catch (Exception e) {
+ throw new RuntimeException("Rewrite data file error.", e);
+ }
}
@Override
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
index 8b4986d..b26ecf0 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
@@ -26,7 +26,6 @@ import java.util.stream.Collectors;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
-import org.apache.flink.streaming.api.datastream.DataStreamUtils;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.logical.RowType;
import org.apache.iceberg.CombinedScanTask;
@@ -59,6 +58,7 @@ public class RowDataRewriter {
private final boolean caseSensitive;
private final EncryptionManager encryptionManager;
private final TaskWriterFactory<RowData> taskWriterFactory;
+ private final String tableName;
public RowDataRewriter(Table table, boolean caseSensitive, FileIO io,
EncryptionManager encryptionManager) {
this.schema = table.schema();
@@ -66,6 +66,7 @@ public class RowDataRewriter {
this.io = io;
this.encryptionManager = encryptionManager;
this.nameMapping = PropertyUtil.propertyAsString(table.properties(),
DEFAULT_NAME_MAPPING, null);
+ this.tableName = table.name();
String formatString = PropertyUtil.propertyAsString(table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
@@ -84,10 +85,10 @@ public class RowDataRewriter {
null);
}
- public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask>
dataStream, int parallelism) {
+ public List<DataFile> rewriteDataForTasks(DataStream<CombinedScanTask>
dataStream, int parallelism) throws Exception {
RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive,
encryptionManager, taskWriterFactory);
DataStream<List<DataFile>> ds =
dataStream.map(map).setParallelism(parallelism);
- return
Lists.newArrayList(DataStreamUtils.collect(ds)).stream().flatMap(Collection::stream)
+ return Lists.newArrayList(ds.executeAndCollect("Rewrite table :" +
tableName)).stream().flatMap(Collection::stream)
.collect(Collectors.toList());
}