openinx commented on a change in pull request #1623:
URL: https://github.com/apache/iceberg/pull/1623#discussion_r514774682
##########
File path:
core/src/main/java/org/apache/iceberg/actions/BaseRewriteDataFilesAction.java
##########
@@ -268,7 +272,7 @@ private void replaceDataFiles(Iterable<DataFile>
deletedDataFiles, Iterable<Data
}
}
- protected abstract FileIO fileIO();
+ protected abstract FileIO setFileIO();
Review comment:
why `setFileIO` ? That's quite confusing, it's more likely we are
getting fileIO while we name it as `setFileIO` ? I don't think we need to
change this.
##########
File path: flink/src/main/java/org/apache/iceberg/actions/Actions.java
##########
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
Review comment:
I'm thinking that we may need to move this class into
`org.apache.iceberg.flink.actions` as we flink & spark both have the same class
name `Actions` under the package `org.apache.iceberg.actions`, then how to
identify the class if someone want to execute the actions in the same project
for both flink and spark ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
+
+ private final Schema schema;
+ private final FileFormat format;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io,
EncryptionManager encryptionManager) {
+ this.schema = table.schema();
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.caseSensitive = caseSensitive;
+ this.nameMapping = PropertyUtil.propertyAsString(table.properties(),
DEFAULT_NAME_MAPPING, null);
+ String formatString = PropertyUtil.propertyAsString(table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
Review comment:
nit: could we separate this `format` parser into a separate code block
by left a new empty line ? That makes the code more clear.
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
+
+ private final Schema schema;
+ private final FileFormat format;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io,
EncryptionManager encryptionManager) {
+ this.schema = table.schema();
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.caseSensitive = caseSensitive;
+ this.nameMapping = PropertyUtil.propertyAsString(table.properties(),
DEFAULT_NAME_MAPPING, null);
+ String formatString = PropertyUtil.propertyAsString(table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
+ this.taskWriterFactory = new RowDataTaskWriterFactory(
+ table.schema(),
+ flinkSchema,
+ table.spec(),
+ table.locationProvider(),
+ io,
+ encryptionManager,
+ Long.MAX_VALUE,
+ format,
+ table.properties());
+ }
+
+ public List<DataFile> rewriteDataForTasks(DataSet<CombinedScanTask> dataSet)
throws Exception {
+ RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive,
encryptionManager, taskWriterFactory);
+ DataSet<List<DataFile>> ds = dataSet.map(map);
+ return
ds.collect().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ }
+
+ public static class RewriteMap extends RichMapFunction<CombinedScanTask,
List<DataFile>> {
+
+ private transient TaskWriter<RowData> writer;
+ private transient int subTaskId;
+ private transient int attemptId;
+
+ private final Schema schema;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean
caseSensitive,
+ EncryptionManager encryptionManager,
TaskWriterFactory<RowData> taskWriterFactory) {
+ this.schema = schema;
+ this.nameMapping = nameMapping;
+ this.io = io;
+ this.caseSensitive = caseSensitive;
+ this.encryptionManager = encryptionManager;
+ this.taskWriterFactory = taskWriterFactory;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ this.attemptId = getRuntimeContext().getAttemptNumber();
+ // Initialize the task writer factory.
+ this.taskWriterFactory.initialize(subTaskId, attemptId);
+ }
+
+ @Override
+ public List<DataFile> map(CombinedScanTask task) throws Exception {
+ // Initialize the task writer.
+ this.writer = taskWriterFactory.create();
+ RowDataIterator iterator =
+ new RowDataIterator(task, io, encryptionManager, schema, schema,
nameMapping, caseSensitive);
+ try {
+ while (iterator.hasNext()) {
+ RowData rowData = iterator.next();
+ writer.write(rowData);
+ }
+ iterator.close();
+ return Lists.newArrayList(writer.complete());
+ } catch (Throwable originalThrowable) {
+ try {
Review comment:
I think we've forgot to close the `RowDataIterator` if encountered any
exception when iterate the records. Pls handle this carefully because if close
failed then we won't want to close it again in the `catch` block.
##########
File path:
flink/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
Review comment:
I think we should also provide an unit test similar to the spark
`testRewriteLargeTableHasResiduals`, that test the option `ignoreResiduals`.
##########
File path:
flink/src/main/java/org/apache/iceberg/actions/RewriteDataFilesAction.java
##########
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.api.java.ExecutionEnvironment;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.source.RowDataRewriter;
+import org.apache.iceberg.io.FileIO;
+
+public class RewriteDataFilesAction extends
Review comment:
nit: don't have to wrap the line here ..
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
+
+ private final Schema schema;
+ private final FileFormat format;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io,
EncryptionManager encryptionManager) {
+ this.schema = table.schema();
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.caseSensitive = caseSensitive;
+ this.nameMapping = PropertyUtil.propertyAsString(table.properties(),
DEFAULT_NAME_MAPPING, null);
+ String formatString = PropertyUtil.propertyAsString(table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
Review comment:
nit: could we keep the same order between the constructor's argument
assignment and fields definition ? for example:
```java
private final int a;
private final int b;
public Construct(int a, int b){
this.a = a;
this.b = b;
}
```
##########
File path: core/src/main/java/org/apache/iceberg/util/PropertyUtil.java
##########
@@ -52,4 +52,13 @@ public static long propertyAsLong(Map<String, String>
properties,
}
return defaultValue;
}
+
+ public static String propertyAsString(Map<String, String> properties,
+ String property, String defaultValue) {
Review comment:
nit: code format ?
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
Review comment:
we don't need to expose this class to public ? As the apache iceberg is
a lib, so we're tried to avoid to expose unnecessary internal classes or
interfaces to public, so that users won't abuse them.
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
+
+ private final Schema schema;
+ private final FileFormat format;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io,
EncryptionManager encryptionManager) {
+ this.schema = table.schema();
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.caseSensitive = caseSensitive;
+ this.nameMapping = PropertyUtil.propertyAsString(table.properties(),
DEFAULT_NAME_MAPPING, null);
+ String formatString = PropertyUtil.propertyAsString(table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
+ this.taskWriterFactory = new RowDataTaskWriterFactory(
+ table.schema(),
+ flinkSchema,
+ table.spec(),
+ table.locationProvider(),
+ io,
+ encryptionManager,
+ Long.MAX_VALUE,
+ format,
+ table.properties());
+ }
+
+ public List<DataFile> rewriteDataForTasks(DataSet<CombinedScanTask> dataSet)
throws Exception {
+ RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive,
encryptionManager, taskWriterFactory);
+ DataSet<List<DataFile>> ds = dataSet.map(map);
+ return
ds.collect().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ }
+
+ public static class RewriteMap extends RichMapFunction<CombinedScanTask,
List<DataFile>> {
+
+ private transient TaskWriter<RowData> writer;
Review comment:
nit: could just use local variable ? don't have to be a transient
field.
##########
File path:
flink/src/main/java/org/apache/iceberg/flink/source/RowDataRewriter.java
##########
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.source;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Locale;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.functions.RichMapFunction;
+import org.apache.flink.api.java.DataSet;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.iceberg.CombinedScanTask;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.encryption.EncryptionManager;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.flink.sink.TaskWriterFactory;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.TaskWriter;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.util.PropertyUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_NAME_MAPPING;
+
+public class RowDataRewriter {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(RowDataRewriter.class);
+
+ private final Schema schema;
+ private final FileFormat format;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RowDataRewriter(Table table, boolean caseSensitive, FileIO io,
EncryptionManager encryptionManager) {
+ this.schema = table.schema();
+ this.io = io;
+ this.encryptionManager = encryptionManager;
+ this.caseSensitive = caseSensitive;
+ this.nameMapping = PropertyUtil.propertyAsString(table.properties(),
DEFAULT_NAME_MAPPING, null);
+ String formatString = PropertyUtil.propertyAsString(table.properties(),
TableProperties.DEFAULT_FILE_FORMAT,
+ TableProperties.DEFAULT_FILE_FORMAT_DEFAULT);
+ this.format = FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ RowType flinkSchema = FlinkSchemaUtil.convert(table.schema());
+ this.taskWriterFactory = new RowDataTaskWriterFactory(
+ table.schema(),
+ flinkSchema,
+ table.spec(),
+ table.locationProvider(),
+ io,
+ encryptionManager,
+ Long.MAX_VALUE,
+ format,
+ table.properties());
+ }
+
+ public List<DataFile> rewriteDataForTasks(DataSet<CombinedScanTask> dataSet)
throws Exception {
+ RewriteMap map = new RewriteMap(schema, nameMapping, io, caseSensitive,
encryptionManager, taskWriterFactory);
+ DataSet<List<DataFile>> ds = dataSet.map(map);
+ return
ds.collect().stream().flatMap(Collection::stream).collect(Collectors.toList());
+ }
+
+ public static class RewriteMap extends RichMapFunction<CombinedScanTask,
List<DataFile>> {
+
+ private transient TaskWriter<RowData> writer;
+ private transient int subTaskId;
+ private transient int attemptId;
+
+ private final Schema schema;
+ private final String nameMapping;
+ private final FileIO io;
+ private final boolean caseSensitive;
+ private final EncryptionManager encryptionManager;
+ private final TaskWriterFactory<RowData> taskWriterFactory;
+
+ public RewriteMap(Schema schema, String nameMapping, FileIO io, boolean
caseSensitive,
+ EncryptionManager encryptionManager,
TaskWriterFactory<RowData> taskWriterFactory) {
+ this.schema = schema;
+ this.nameMapping = nameMapping;
+ this.io = io;
+ this.caseSensitive = caseSensitive;
+ this.encryptionManager = encryptionManager;
+ this.taskWriterFactory = taskWriterFactory;
+ }
+
+ @Override
+ public void open(Configuration parameters) {
+ this.subTaskId = getRuntimeContext().getIndexOfThisSubtask();
+ this.attemptId = getRuntimeContext().getAttemptNumber();
+ // Initialize the task writer factory.
+ this.taskWriterFactory.initialize(subTaskId, attemptId);
+ }
+
+ @Override
+ public List<DataFile> map(CombinedScanTask task) throws Exception {
+ // Initialize the task writer.
+ this.writer = taskWriterFactory.create();
+ RowDataIterator iterator =
+ new RowDataIterator(task, io, encryptionManager, schema, schema,
nameMapping, caseSensitive);
+ try {
+ while (iterator.hasNext()) {
+ RowData rowData = iterator.next();
+ writer.write(rowData);
+ }
+ iterator.close();
+ return Lists.newArrayList(writer.complete());
+ } catch (Throwable originalThrowable) {
+ try {
+ LOG.error("Aborting commit for (subTaskId {}, attemptId {})",
subTaskId, attemptId);
+ writer.abort();
+ LOG.error("Aborted commit for (subTaskId {}, attemptId {})",
subTaskId, attemptId);
+ } catch (Throwable inner) {
+ if (originalThrowable != inner) {
+ originalThrowable.addSuppressed(inner);
Review comment:
What's the reason that we need to supress the `inner` exception when
it's different with the `originalThrowable` ?
##########
File path:
flink/src/test/java/org/apache/iceberg/actions/TestRewriteDataFilesAction.java
##########
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+package org.apache.iceberg.actions;
+
+import java.util.List;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.flink.FlinkCatalogTestBase;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestRewriteDataFilesAction extends FlinkCatalogTestBase {
+
+ private static final String TABLE_NAME_UNPARTIITONED =
"test_table_unpartitioned";
+ private static final String TABLE_NAME_PARTIITONED =
"test_table_partitioned";
+ private final FileFormat format;
+ private Table icebergTableUnPartitioned;
+ private Table icebergTablePartitioned;
+
+ public TestRewriteDataFilesAction(String catalogName, String[]
baseNamespace, FileFormat format) {
+ super(catalogName, baseNamespace);
+ this.format = format;
+ }
+
+ @Parameterized.Parameters(name = "catalogName={0}, baseNamespace={1},
format={2}")
+ public static Iterable<Object[]> parameters() {
+ List<Object[]> parameters = Lists.newArrayList();
+ for (FileFormat format : new FileFormat[] {FileFormat.ORC,
FileFormat.AVRO, FileFormat.PARQUET}) {
+ for (Object[] catalogParams : FlinkCatalogTestBase.parameters()) {
+ String catalogName = (String) catalogParams[0];
+ String[] baseNamespace = (String[]) catalogParams[1];
+ parameters.add(new Object[] {catalogName, baseNamespace, format});
+ }
+ }
+ return parameters;
+ }
+
+ @Before
+ public void before() {
+ super.before();
+ sql("CREATE DATABASE %s", flinkDatabase);
+ sql("USE CATALOG %s", catalogName);
+ sql("USE %s", DATABASE);
+ sql("CREATE TABLE %s (id int, data varchar) with
('write.format.default'='%s')", TABLE_NAME_UNPARTIITONED,
+ format.name());
+ icebergTableUnPartitioned =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
+ TABLE_NAME_UNPARTIITONED));
+
+ sql("CREATE TABLE %s (id int, data varchar) PARTITIONED BY (data) with
('write.format.default'='%s')",
+ TABLE_NAME_PARTIITONED, format.name());
+ icebergTablePartitioned =
validationCatalog.loadTable(TableIdentifier.of(icebergNamespace,
+ TABLE_NAME_PARTIITONED));
+ }
+
+ @After
+ public void clean() {
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_UNPARTIITONED);
+ sql("DROP TABLE IF EXISTS %s.%s", flinkDatabase, TABLE_NAME_PARTIITONED);
+ sql("DROP DATABASE IF EXISTS %s", flinkDatabase);
+ super.clean();
+ }
+
+ @Test
+ public void testRewriteDataFilesEmptyTable() throws Exception {
+ Assert.assertNull("Table must be empty",
icebergTableUnPartitioned.currentSnapshot());
+ Actions.forTable(icebergTableUnPartitioned)
+ .rewriteDataFiles()
+ .execute();
+ Assert.assertNull("Table must stay empty",
icebergTableUnPartitioned.currentSnapshot());
+ }
+
+
+ @Test
+ public void testRewriteDataFilesUnpartitionedTable() throws Exception {
+ sql("INSERT INTO %s SELECT 1, 'hello'", TABLE_NAME_UNPARTIITONED);
+ sql("INSERT INTO %s SELECT 2, 'world'", TABLE_NAME_UNPARTIITONED);
+
+ icebergTableUnPartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks =
icebergTableUnPartitioned.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 2 data files before rewrite", 2,
dataFiles.size());
+
+ RewriteDataFilesActionResult result =
+ Actions.forTable(icebergTableUnPartitioned)
+ .rewriteDataFiles()
+ .execute();
+
+ Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+
+ icebergTableUnPartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 =
icebergTableUnPartitioned.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 1 data files after rewrite", 1,
dataFiles1.size());
+
+ // Assert the table records as expected.
+ SimpleDataUtil.assertTableRecords(icebergTableUnPartitioned,
Lists.newArrayList(
+ SimpleDataUtil.createRecord(1, "hello"),
+ SimpleDataUtil.createRecord(2, "world")
+ ));
+ }
+
+ @Test
+ public void testRewriteDataFilesPartitionedTable() throws Exception {
+ sql("INSERT INTO %s SELECT 1, 'hello' ", TABLE_NAME_PARTIITONED);
+ sql("INSERT INTO %s SELECT 2, 'hello' ", TABLE_NAME_PARTIITONED);
+ sql("INSERT INTO %s SELECT 3, 'world' ", TABLE_NAME_PARTIITONED);
+ sql("INSERT INTO %s SELECT 4, 'world' ", TABLE_NAME_PARTIITONED);
+
+ icebergTablePartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks =
icebergTablePartitioned.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 4 data files before rewrite", 4,
dataFiles.size());
+
+ RewriteDataFilesActionResult result =
+ Actions.forTable(icebergTablePartitioned)
+ .rewriteDataFiles()
+ .execute();
+
+ Assert.assertEquals("Action should rewrite 4 data files", 4,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 2 data file", 2,
result.addedDataFiles().size());
+
+ icebergTablePartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 =
icebergTablePartitioned.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 2 data files after rewrite", 2,
dataFiles1.size());
+
+ // Assert the table records as expected.
+ SimpleDataUtil.assertTableRecords(icebergTablePartitioned,
Lists.newArrayList(
+ SimpleDataUtil.createRecord(1, "hello"),
+ SimpleDataUtil.createRecord(2, "hello"),
+ SimpleDataUtil.createRecord(3, "world"),
+ SimpleDataUtil.createRecord(4, "world")
+ ));
+ }
+
+
+ @Test
+ public void testRewriteDataFilesWithFilter() throws Exception {
+ sql("INSERT INTO %s SELECT 1, 'hello' ", TABLE_NAME_PARTIITONED);
+ sql("INSERT INTO %s SELECT 1, 'hello' ", TABLE_NAME_PARTIITONED);
+ sql("INSERT INTO %s SELECT 1, 'world' ", TABLE_NAME_PARTIITONED);
+ sql("INSERT INTO %s SELECT 2, 'world' ", TABLE_NAME_PARTIITONED);
+ sql("INSERT INTO %s SELECT 3, 'world' ", TABLE_NAME_PARTIITONED);
+
+ icebergTablePartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks =
icebergTablePartitioned.newScan().planFiles();
+ List<DataFile> dataFiles =
Lists.newArrayList(CloseableIterable.transform(tasks, FileScanTask::file));
+ Assert.assertEquals("Should have 5 data files before rewrite", 5,
dataFiles.size());
+
+ RewriteDataFilesActionResult result =
+ Actions.forTable(icebergTablePartitioned)
+ .rewriteDataFiles()
+ .filter(Expressions.equal("id", 1))
+ .filter(Expressions.startsWith("data", "he"))
+ .execute();
+
+ Assert.assertEquals("Action should rewrite 2 data files", 2,
result.deletedDataFiles().size());
+ Assert.assertEquals("Action should add 1 data file", 1,
result.addedDataFiles().size());
+
+ icebergTablePartitioned.refresh();
+
+ CloseableIterable<FileScanTask> tasks1 =
icebergTablePartitioned.newScan().planFiles();
+ List<DataFile> dataFiles1 =
Lists.newArrayList(CloseableIterable.transform(tasks1, FileScanTask::file));
+ Assert.assertEquals("Should have 4 data files after rewrite", 4,
dataFiles1.size());
+
+ // Assert the table records as expected.
+ SimpleDataUtil.assertTableRecords(icebergTablePartitioned,
Lists.newArrayList(
Review comment:
I think we may need to refactor the `SimpleDataUtil.assertTableRecords`,
because it will use `Set` to check the expected records, that means it will
de-duplicate the same records automatically, and won't identify the case that
have two `<1, 'hello'>` records.
##########
File path: flink/src/test/java/org/apache/iceberg/flink/FlinkTestBase.java
##########
@@ -72,7 +72,7 @@ protected TableEnvironment getTableEnv() {
return tEnv;
}
- List<Object[]> sql(String query, Object... args) {
+ public List<Object[]> sql(String query, Object... args) {
Review comment:
How about use `protected` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]