This is an automated email from the ASF dual-hosted git repository.
blue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git
The following commit(s) were added to refs/heads/master by this push:
new f950a3e Flink: Add operator to collect data files and append to a
table (#1185)
f950a3e is described below
commit f950a3e63c98e88b1f8905faacf33a1d5d31e4c0
Author: openinx <[email protected]>
AuthorDate: Sat Aug 29 01:19:33 2020 +0800
Flink: Add operator to collect data files and append to a table (#1185)
---
.../org/apache/iceberg/flink/IcebergSinkUtil.java | 81 ----
.../iceberg/flink/data/FlinkParquetWriters.java | 3 +-
.../org/apache/iceberg/flink/sink/FlinkSink.java | 220 +++++++++
.../iceberg/flink/sink/IcebergFilesCommitter.java | 229 ++++++++++
.../flink/{ => sink}/IcebergStreamWriter.java | 6 +-
.../flink/{ => sink}/PartitionedFanoutWriter.java | 2 +-
.../flink/{ => sink}/RowDataTaskWriterFactory.java | 42 +-
.../iceberg/flink/{ => sink}/RowDataWrapper.java | 2 +-
.../flink/{ => sink}/TaskWriterFactory.java | 2 +-
.../org/apache/iceberg/flink/SimpleDataUtil.java | 69 ++-
.../iceberg/flink/sink/TestFlinkIcebergSink.java | 178 ++++++++
.../flink/sink/TestIcebergFilesCommitter.java | 505 +++++++++++++++++++++
.../flink/{ => sink}/TestIcebergStreamWriter.java | 10 +-
.../flink/{ => sink}/TestRowDataPartitionKey.java | 3 +-
.../iceberg/flink/{ => sink}/TestTaskWriters.java | 8 +-
15 files changed, 1232 insertions(+), 128 deletions(-)
diff --git a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java
b/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java
deleted file mode 100644
index 8d80342..0000000
--- a/flink/src/main/java/org/apache/iceberg/flink/IcebergSinkUtil.java
+++ /dev/null
@@ -1,81 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.iceberg.flink;
-
-import java.util.Locale;
-import java.util.Map;
-import org.apache.flink.table.api.TableSchema;
-import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.types.logical.RowType;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-import org.apache.iceberg.types.TypeUtil;
-import org.apache.iceberg.util.PropertyUtil;
-
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
-import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
-import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
-import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
-
-class IcebergSinkUtil {
- private IcebergSinkUtil() {
- }
-
- static IcebergStreamWriter<RowData> createStreamWriter(Table table,
TableSchema requestedSchema) {
- Preconditions.checkArgument(table != null, "Iceberg table should't be
null");
-
- RowType flinkSchema;
- if (requestedSchema != null) {
- // Convert the flink schema to iceberg schema firstly, then reassign ids
to match the existing iceberg schema.
- Schema writeSchema =
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema());
- TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true);
-
- // We use this flink schema to read values from RowData. The flink's
TINYINT and SMALLINT will be promoted to
- // iceberg INTEGER, that means if we use iceberg's table schema to read
TINYINT (backend by 1 'byte'), we will
- // read 4 bytes rather than 1 byte, it will mess up the byte array in
BinaryRowData. So here we must use flink
- // schema.
- flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType();
- } else {
- flinkSchema = FlinkSchemaUtil.convert(table.schema());
- }
-
- Map<String, String> props = table.properties();
- long targetFileSize = getTargetFileSizeBytes(props);
- FileFormat fileFormat = getFileFormat(props);
-
- TaskWriterFactory<RowData> taskWriterFactory = new
RowDataTaskWriterFactory(table.schema(), flinkSchema,
- table.spec(), table.locationProvider(), table.io(),
table.encryption(), targetFileSize, fileFormat, props);
-
- return new IcebergStreamWriter<>(table.toString(), taskWriterFactory);
- }
-
- private static FileFormat getFileFormat(Map<String, String> properties) {
- String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT,
DEFAULT_FILE_FORMAT_DEFAULT);
- return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
- }
-
- private static long getTargetFileSizeBytes(Map<String, String> properties) {
- return PropertyUtil.propertyAsLong(properties,
- WRITE_TARGET_FILE_SIZE_BYTES,
- WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
- }
-}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
index 1639a44..c91b659 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/data/FlinkParquetWriters.java
@@ -51,7 +51,7 @@ import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.PrimitiveType;
import org.apache.parquet.schema.Type;
-class FlinkParquetWriters {
+public class FlinkParquetWriters {
private FlinkParquetWriters() {
}
@@ -207,6 +207,7 @@ class FlinkParquetWriters {
" wrong precision %s", precision);
return new IntegerDecimalWriter(desc, precision, scale);
}
+
private static ParquetValueWriters.PrimitiveWriter<DecimalData>
decimalAsLong(ColumnDescriptor desc,
int precision, int scale) {
Preconditions.checkArgument(precision <= 18, "Cannot write decimal value
as long with precision larger than 18, " +
diff --git a/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
new file mode 100644
index 0000000..96ce0ba
--- /dev/null
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.common.typeinfo.Types;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PropertyUtil;
+
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT;
+import static org.apache.iceberg.TableProperties.DEFAULT_FILE_FORMAT_DEFAULT;
+import static org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES;
+import static
org.apache.iceberg.TableProperties.WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT;
+
+public class FlinkSink {
+
+ private static final String ICEBERG_STREAM_WRITER_NAME =
IcebergStreamWriter.class.getSimpleName();
+ private static final String ICEBERG_FILES_COMMITTER_NAME =
IcebergFilesCommitter.class.getSimpleName();
+
+ private FlinkSink() {
+ }
+
+ /**
+ * Initialize a {@link Builder} to export the data from generic input data
stream into iceberg table. We use
+ * {@link RowData} inside the sink connector, so users need to provide a
mapper function and a
+ * {@link TypeInformation} to convert those generic records to a RowData
DataStream.
+ *
+ * @param input the generic source input data stream.
+ * @param mapper function to convert the generic data to {@link RowData}
+ * @param outputType to define the {@link TypeInformation} for the input
data.
+ * @param <T> the data type of records.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public static <T> Builder builderFor(DataStream<T> input,
+ MapFunction<T, RowData> mapper,
+ TypeInformation<RowData> outputType) {
+ DataStream<RowData> dataStream = input.map(mapper, outputType);
+ return forRowData(dataStream);
+ }
+
+ /**
+ * Initialize a {@link Builder} to export the data from input data stream
with {@link Row}s into iceberg table. We use
+ * {@link RowData} inside the sink connector, so users need to provide a
{@link TableSchema} for builder to convert
+ * those {@link Row}s to a {@link RowData} DataStream.
+ *
+ * @param input the source input data stream with {@link Row}s.
+ * @param tableSchema defines the {@link TypeInformation} for input data.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public static Builder forRow(DataStream<Row> input, TableSchema tableSchema)
{
+ RowType rowType = (RowType) tableSchema.toRowDataType().getLogicalType();
+ DataType[] fieldDataTypes = tableSchema.getFieldDataTypes();
+
+ DataFormatConverters.RowConverter rowConverter = new
DataFormatConverters.RowConverter(fieldDataTypes);
+ return builderFor(input, rowConverter::toInternal,
RowDataTypeInfo.of(rowType))
+ .tableSchema(tableSchema);
+ }
+
+ /**
+ * Initialize a {@link Builder} to export the data from input data stream
with {@link RowData}s into iceberg table.
+ *
+ * @param input the source input data stream with {@link RowData}s.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public static Builder forRowData(DataStream<RowData> input) {
+ return new Builder().forRowData(input);
+ }
+
+ public static class Builder {
+ private DataStream<RowData> rowDataInput = null;
+ private TableLoader tableLoader;
+ private Configuration hadoopConf;
+ private Table table;
+ private TableSchema tableSchema;
+
+ private Builder() {
+ }
+
+ private Builder forRowData(DataStream<RowData> newRowDataInput) {
+ this.rowDataInput = newRowDataInput;
+ return this;
+ }
+
+ /**
+ * This iceberg {@link Table} instance is used for initializing {@link
IcebergStreamWriter} which will write all
+ * the records into {@link DataFile}s and emit them to downstream
operator. Providing a table would avoid so many
+ * table loading from each separate task.
+ *
+ * @param newTable the loaded iceberg table instance.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder table(Table newTable) {
+ this.table = newTable;
+ return this;
+ }
+
+ /**
+ * The table loader is used for loading tables in {@link
IcebergFilesCommitter} lazily, we need this loader because
+ * {@link Table} is not serializable and could not just use the loaded
table from Builder#table in the remote task
+ * manager.
+ *
+ * @param newTableLoader to load iceberg table inside tasks.
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ public Builder tableLoader(TableLoader newTableLoader) {
+ this.tableLoader = newTableLoader;
+ return this;
+ }
+
+ public Builder hadoopConf(Configuration newHadoopConf) {
+ this.hadoopConf = newHadoopConf;
+ return this;
+ }
+
+ public Builder tableSchema(TableSchema newTableSchema) {
+ this.tableSchema = newTableSchema;
+ return this;
+ }
+
+ @SuppressWarnings("unchecked")
+ public DataStreamSink<RowData> build() {
+ Preconditions.checkArgument(rowDataInput != null,
+ "Please use forRowData() to initialize the input DataStream.");
+ Preconditions.checkNotNull(table, "Table shouldn't be null");
+ Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be
null");
+ Preconditions.checkNotNull(hadoopConf, "Hadoop configuration shouldn't
be null");
+
+ IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table,
tableSchema);
+ IcebergFilesCommitter filesCommitter = new
IcebergFilesCommitter(tableLoader, hadoopConf);
+
+ DataStream<Void> returnStream = rowDataInput
+ .transform(ICEBERG_STREAM_WRITER_NAME,
TypeInformation.of(DataFile.class), streamWriter)
+ .setParallelism(rowDataInput.getParallelism())
+ .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)
+ .setParallelism(1)
+ .setMaxParallelism(1);
+
+ return returnStream.addSink(new DiscardingSink())
+ .name(String.format("IcebergSink %s", table.toString()))
+ .setParallelism(1);
+ }
+ }
+
+ static IcebergStreamWriter<RowData> createStreamWriter(Table table,
TableSchema requestedSchema) {
+ Preconditions.checkArgument(table != null, "Iceberg table should't be
null");
+
+ RowType flinkSchema;
+ if (requestedSchema != null) {
+ // Convert the flink schema to iceberg schema firstly, then reassign ids
to match the existing iceberg schema.
+ Schema writeSchema =
TypeUtil.reassignIds(FlinkSchemaUtil.convert(requestedSchema), table.schema());
+ TypeUtil.validateWriteSchema(table.schema(), writeSchema, true, true);
+
+ // We use this flink schema to read values from RowData. The flink's
TINYINT and SMALLINT will be promoted to
+ // iceberg INTEGER, that means if we use iceberg's table schema to read
TINYINT (backend by 1 'byte'), we will
+ // read 4 bytes rather than 1 byte, it will mess up the byte array in
BinaryRowData. So here we must use flink
+ // schema.
+ flinkSchema = (RowType) requestedSchema.toRowDataType().getLogicalType();
+ } else {
+ flinkSchema = FlinkSchemaUtil.convert(table.schema());
+ }
+
+ Map<String, String> props = table.properties();
+ long targetFileSize = getTargetFileSizeBytes(props);
+ FileFormat fileFormat = getFileFormat(props);
+
+ TaskWriterFactory<RowData> taskWriterFactory = new
RowDataTaskWriterFactory(table.schema(), flinkSchema,
+ table.spec(), table.locationProvider(), table.io(),
table.encryption(), targetFileSize, fileFormat, props);
+
+ return new IcebergStreamWriter<>(table.toString(), taskWriterFactory);
+ }
+
+ private static FileFormat getFileFormat(Map<String, String> properties) {
+ String formatString = properties.getOrDefault(DEFAULT_FILE_FORMAT,
DEFAULT_FILE_FORMAT_DEFAULT);
+ return FileFormat.valueOf(formatString.toUpperCase(Locale.ENGLISH));
+ }
+
+ private static long getTargetFileSizeBytes(Map<String, String> properties) {
+ return PropertyUtil.propertyAsLong(properties,
+ WRITE_TARGET_FILE_SIZE_BYTES,
+ WRITE_TARGET_FILE_SIZE_BYTES_DEFAULT);
+ }
+}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
new file mode 100644
index 0000000..e0cfbaf
--- /dev/null
+++
b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergFilesCommitter.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableMap;
+import java.util.SortedMap;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.ListTypeInfo;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.runtime.typeutils.SortedMapTypeInfo;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AppendFiles;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.hadoop.SerializableConfiguration;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.apache.iceberg.relocated.com.google.common.collect.Maps;
+import org.apache.iceberg.types.Comparators;
+import org.apache.iceberg.types.Types;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+class IcebergFilesCommitter extends AbstractStreamOperator<Void>
+ implements OneInputStreamOperator<DataFile, Void>, BoundedOneInput {
+
+ private static final long serialVersionUID = 1L;
+ private static final long INITIAL_CHECKPOINT_ID = -1L;
+
+ private static final Logger LOG =
LoggerFactory.getLogger(IcebergFilesCommitter.class);
+ private static final String FLINK_JOB_ID = "flink.job-id";
+
+ // The max checkpoint id we've committed to iceberg table. As the flink's
checkpoint is always increasing, so we could
+ // correctly commit all the data files whose checkpoint id is greater than
the max committed one to iceberg table, for
+ // avoiding committing the same data files twice. This id will be attached
to iceberg's meta when committing the
+ // iceberg transaction.
+ private static final String MAX_COMMITTED_CHECKPOINT_ID =
"flink.max-committed-checkpoint-id";
+
+ // TableLoader to load iceberg table lazily.
+ private final TableLoader tableLoader;
+ private final SerializableConfiguration hadoopConf;
+
+ // A sorted map to maintain the completed data files for each pending
checkpointId (which have not been committed
+ // to iceberg table). We need a sorted map here because there's possible
that few checkpoints snapshot failed, for
+ // example: the 1st checkpoint have 2 data files <1, <file0, file1>>, the
2st checkpoint have 1 data files
+ // <2, <file3>>. Snapshot for checkpoint#1 interrupted because of
network/disk failure etc, while we don't expect
+ // any data loss in iceberg table. So we keep the finished files <1, <file0,
file1>> in memory and retry to commit
+ // iceberg table when the next checkpoint happen.
+ private final NavigableMap<Long, List<DataFile>> dataFilesPerCheckpoint =
Maps.newTreeMap();
+
+ // The data files cache for current checkpoint. Once the snapshot barrier
received, it will be flushed to the
+ // 'dataFilesPerCheckpoint'.
+ private final List<DataFile> dataFilesOfCurrentCheckpoint =
Lists.newArrayList();
+
+ // It will have an unique identifier for one job.
+ private transient String flinkJobId;
+ private transient Table table;
+ private transient long maxCommittedCheckpointId;
+
+ // All pending checkpoints states for this function.
+ private static final ListStateDescriptor<SortedMap<Long, List<DataFile>>>
STATE_DESCRIPTOR = buildStateDescriptor();
+ private transient ListState<SortedMap<Long, List<DataFile>>>
checkpointsState;
+
+ IcebergFilesCommitter(TableLoader tableLoader, Configuration hadoopConf) {
+ this.tableLoader = tableLoader;
+ this.hadoopConf = new SerializableConfiguration(hadoopConf);
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ this.flinkJobId =
getContainingTask().getEnvironment().getJobID().toString();
+
+ // Open the table loader and load the table.
+ this.tableLoader.open(hadoopConf.get());
+ this.table = tableLoader.loadTable();
+ this.maxCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+ this.checkpointsState =
context.getOperatorStateStore().getListState(STATE_DESCRIPTOR);
+ if (context.isRestored()) {
+ this.maxCommittedCheckpointId = getMaxCommittedCheckpointId(table,
flinkJobId);
+ // In the restoring path, it should have one valid snapshot for current
flink job at least, so the max committed
+ // checkpoint id should be positive. If it's not positive, that means
someone might have removed or expired the
+ // iceberg snapshot, in that case we should throw an exception in case
of committing duplicated data files into
+ // the iceberg table.
+ Preconditions.checkState(maxCommittedCheckpointId !=
INITIAL_CHECKPOINT_ID,
+ "There should be an existing iceberg snapshot for current flink job:
%s", flinkJobId);
+
+ SortedMap<Long, List<DataFile>> restoredDataFiles =
checkpointsState.get().iterator().next();
+ // Only keep the uncommitted data files in the cache.
+
this.dataFilesPerCheckpoint.putAll(restoredDataFiles.tailMap(maxCommittedCheckpointId
+ 1));
+ }
+ }
+
+ @Override
+ public void snapshotState(StateSnapshotContext context) throws Exception {
+ super.snapshotState(context);
+ long checkpointId = context.getCheckpointId();
+ LOG.info("Start to flush snapshot state to state backend, table: {},
checkpointId: {}", table, checkpointId);
+
+ // Update the checkpoint state.
+ dataFilesPerCheckpoint.put(checkpointId,
ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+
+ // Reset the snapshot state to the latest state.
+ checkpointsState.clear();
+ checkpointsState.add(dataFilesPerCheckpoint);
+
+ // Clear the local buffer for current checkpoint.
+ dataFilesOfCurrentCheckpoint.clear();
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) throws Exception {
+ super.notifyCheckpointComplete(checkpointId);
+ // It's possible that we have the following events:
+ // 1. snapshotState(ckpId);
+ // 2. snapshotState(ckpId+1);
+ // 3. notifyCheckpointComplete(ckpId+1);
+ // 4. notifyCheckpointComplete(ckpId);
+ // For step#4, we don't need to commit iceberg table again because in
step#3 we've committed all the files,
+ // Besides, we need to maintain the max-committed-checkpoint-id to be
increasing.
+ if (checkpointId > maxCommittedCheckpointId) {
+ commitUpToCheckpoint(checkpointId);
+ this.maxCommittedCheckpointId = checkpointId;
+ }
+ }
+
+ private void commitUpToCheckpoint(long checkpointId) {
+ NavigableMap<Long, List<DataFile>> pendingFileMap =
dataFilesPerCheckpoint.headMap(checkpointId, true);
+
+ List<DataFile> pendingDataFiles = Lists.newArrayList();
+ for (List<DataFile> dataFiles : pendingFileMap.values()) {
+ pendingDataFiles.addAll(dataFiles);
+ }
+
+ AppendFiles appendFiles = table.newAppend();
+ pendingDataFiles.forEach(appendFiles::appendFile);
+ appendFiles.set(MAX_COMMITTED_CHECKPOINT_ID, Long.toString(checkpointId));
+ appendFiles.set(FLINK_JOB_ID, flinkJobId);
+ appendFiles.commit();
+
+ // Clear the committed data files from dataFilesPerCheckpoint.
+ pendingFileMap.clear();
+ }
+
+ @Override
+ public void processElement(StreamRecord<DataFile> element) {
+ this.dataFilesOfCurrentCheckpoint.add(element.getValue());
+ }
+
+ @Override
+ public void endInput() {
+ // Flush the buffered data files into 'dataFilesPerCheckpoint' firstly.
+ dataFilesPerCheckpoint.put(Long.MAX_VALUE,
ImmutableList.copyOf(dataFilesOfCurrentCheckpoint));
+ dataFilesOfCurrentCheckpoint.clear();
+
+ commitUpToCheckpoint(Long.MAX_VALUE);
+ }
+
+ @Override
+ public void dispose() throws Exception {
+ if (tableLoader != null) {
+ tableLoader.close();
+ }
+ }
+
+ private static ListStateDescriptor<SortedMap<Long, List<DataFile>>>
buildStateDescriptor() {
+ Comparator<Long> longComparator =
Comparators.forType(Types.LongType.get());
+ // Construct a ListTypeInfo.
+ ListTypeInfo<DataFile> dataFileListTypeInfo = new
ListTypeInfo<>(TypeInformation.of(DataFile.class));
+ // Construct a SortedMapTypeInfo.
+ SortedMapTypeInfo<Long, List<DataFile>> sortedMapTypeInfo = new
SortedMapTypeInfo<>(
+ BasicTypeInfo.LONG_TYPE_INFO, dataFileListTypeInfo, longComparator
+ );
+ return new ListStateDescriptor<>("iceberg-files-committer-state",
sortedMapTypeInfo);
+ }
+
+ static long getMaxCommittedCheckpointId(Table table, String flinkJobId) {
+ Snapshot snapshot = table.currentSnapshot();
+ long lastCommittedCheckpointId = INITIAL_CHECKPOINT_ID;
+
+ while (snapshot != null) {
+ Map<String, String> summary = snapshot.summary();
+ String snapshotFlinkJobId = summary.get(FLINK_JOB_ID);
+ if (flinkJobId.equals(snapshotFlinkJobId)) {
+ String value = summary.get(MAX_COMMITTED_CHECKPOINT_ID);
+ if (value != null) {
+ lastCommittedCheckpointId = Long.parseLong(value);
+ break;
+ }
+ }
+ Long parentSnapshotId = snapshot.parentId();
+ snapshot = parentSnapshotId != null ? table.snapshot(parentSnapshotId) :
null;
+ }
+
+ return lastCommittedCheckpointId;
+ }
+}
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
similarity index 94%
rename from
flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
rename to
flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
index e18c293..95daa96 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/IcebergStreamWriter.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/IcebergStreamWriter.java
@@ -17,11 +17,12 @@
* under the License.
*/
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
import java.io.IOException;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.iceberg.DataFile;
@@ -34,8 +35,8 @@ class IcebergStreamWriter<T> extends
AbstractStreamOperator<DataFile>
private static final long serialVersionUID = 1L;
private final String fullTableName;
+ private final TaskWriterFactory<T> taskWriterFactory;
- private transient TaskWriterFactory<T> taskWriterFactory;
private transient TaskWriter<T> writer;
private transient int subTaskId;
private transient int attemptId;
@@ -43,6 +44,7 @@ class IcebergStreamWriter<T> extends
AbstractStreamOperator<DataFile>
IcebergStreamWriter(String fullTableName, TaskWriterFactory<T>
taskWriterFactory) {
this.fullTableName = fullTableName;
this.taskWriterFactory = taskWriterFactory;
+ setChainingStrategy(ChainingStrategy.ALWAYS);
}
@Override
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java
similarity index 98%
rename from
flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java
rename to
flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java
index 78c29c5..ad84697 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/PartitionedFanoutWriter.java
+++
b/flink/src/main/java/org/apache/iceberg/flink/sink/PartitionedFanoutWriter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
import java.io.IOException;
import java.util.Map;
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
similarity index 80%
rename from
flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java
rename to
flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
index e8c5301..f50f432 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/RowDataTaskWriterFactory.java
+++
b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java
@@ -17,9 +17,10 @@
* under the License.
*/
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
import java.io.IOException;
+import java.io.Serializable;
import java.io.UncheckedIOException;
import java.util.Map;
import org.apache.flink.table.data.RowData;
@@ -33,6 +34,7 @@ import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.encryption.EncryptionManager;
import org.apache.iceberg.flink.data.FlinkAvroWriter;
import org.apache.iceberg.flink.data.FlinkOrcWriter;
+import org.apache.iceberg.flink.data.FlinkParquetWriters;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.io.FileIO;
@@ -42,9 +44,10 @@ import org.apache.iceberg.io.OutputFileFactory;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.io.UnpartitionedWriter;
import org.apache.iceberg.orc.ORC;
+import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
-class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
+public class RowDataTaskWriterFactory implements TaskWriterFactory<RowData> {
private final Schema schema;
private final RowType flinkSchema;
private final PartitionSpec spec;
@@ -55,17 +58,17 @@ class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
private final FileFormat format;
private final FileAppenderFactory<RowData> appenderFactory;
- private OutputFileFactory outputFileFactory;
-
- RowDataTaskWriterFactory(Schema schema,
- RowType flinkSchema,
- PartitionSpec spec,
- LocationProvider locations,
- FileIO io,
- EncryptionManager encryptionManager,
- long targetFileSizeBytes,
- FileFormat format,
- Map<String, String> tableProperties) {
+ private transient OutputFileFactory outputFileFactory;
+
+ public RowDataTaskWriterFactory(Schema schema,
+ RowType flinkSchema,
+ PartitionSpec spec,
+ LocationProvider locations,
+ FileIO io,
+ EncryptionManager encryptionManager,
+ long targetFileSizeBytes,
+ FileFormat format,
+ Map<String, String> tableProperties) {
this.schema = schema;
this.flinkSchema = flinkSchema;
this.spec = spec;
@@ -115,12 +118,12 @@ class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
}
}
- private static class FlinkFileAppenderFactory implements
FileAppenderFactory<RowData> {
+ public static class FlinkFileAppenderFactory implements
FileAppenderFactory<RowData>, Serializable {
private final Schema schema;
private final RowType flinkSchema;
private final Map<String, String> props;
- private FlinkFileAppenderFactory(Schema schema, RowType flinkSchema,
Map<String, String> props) {
+ public FlinkFileAppenderFactory(Schema schema, RowType flinkSchema,
Map<String, String> props) {
this.schema = schema;
this.flinkSchema = flinkSchema;
this.props = props;
@@ -128,7 +131,6 @@ class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
@Override
public FileAppender<RowData> newAppender(OutputFile outputFile, FileFormat
format) {
- // TODO MetricsConfig will be used for building parquet RowData writer.
MetricsConfig metricsConfig = MetricsConfig.fromProperties(props);
try {
switch (format) {
@@ -149,6 +151,14 @@ class RowDataTaskWriterFactory implements
TaskWriterFactory<RowData> {
.build();
case PARQUET:
+ return Parquet.write(outputFile)
+ .createWriterFunc(msgType ->
FlinkParquetWriters.buildWriter(flinkSchema, msgType))
+ .setAll(props)
+ .metricsConfig(metricsConfig)
+ .schema(schema)
+ .overwrite()
+ .build();
+
default:
throw new UnsupportedOperationException("Cannot write unknown file
format: " + format);
}
diff --git a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java
similarity index 99%
rename from flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
rename to flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java
index cf0c09c..27e97ae 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/RowDataWrapper.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataWrapper.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
import java.lang.reflect.Array;
import java.nio.ByteBuffer;
diff --git
a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
similarity index 97%
rename from flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
rename to
flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
index c47da24..6ed7696 100644
--- a/flink/src/main/java/org/apache/iceberg/flink/TaskWriterFactory.java
+++ b/flink/src/main/java/org/apache/iceberg/flink/sink/TaskWriterFactory.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
import java.io.Serializable;
import org.apache.iceberg.io.TaskWriter;
diff --git a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
index 82c2a89..b377e54 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/SimpleDataUtil.java
@@ -22,44 +22,58 @@ package org.apache.iceberg.flink;
import java.io.IOException;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.TableSchema;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.types.logical.RowType;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
import org.apache.iceberg.PartitionSpec;
import org.apache.iceberg.Schema;
import org.apache.iceberg.Table;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.IcebergGenerics;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.sink.RowDataTaskWriterFactory;
+import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.io.CloseableIterable;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
+import static org.apache.iceberg.hadoop.HadoopOutputFile.fromPath;
+
public class SimpleDataUtil {
private SimpleDataUtil() {
}
- static final Schema SCHEMA = new Schema(
+ public static final Schema SCHEMA = new Schema(
Types.NestedField.optional(1, "id", Types.IntegerType.get()),
Types.NestedField.optional(2, "data", Types.StringType.get())
);
- static final TableSchema FLINK_SCHEMA = TableSchema.builder()
+ public static final TableSchema FLINK_SCHEMA = TableSchema.builder()
.field("id", DataTypes.INT())
.field("data", DataTypes.STRING())
.build();
- static final Record RECORD = GenericRecord.create(SCHEMA);
+ public static final RowType ROW_TYPE = (RowType)
FLINK_SCHEMA.toRowDataType().getLogicalType();
+
+ public static final Record RECORD = GenericRecord.create(SCHEMA);
- static Table createTable(String path, Map<String, String> properties,
boolean partitioned) {
+ public static Table createTable(String path, Map<String, String> properties,
boolean partitioned) {
PartitionSpec spec;
if (partitioned) {
spec = PartitionSpec.builderFor(SCHEMA).identity("data").build();
@@ -69,34 +83,55 @@ public class SimpleDataUtil {
return new HadoopTables().create(SCHEMA, spec, properties, path);
}
- static Record createRecord(Integer id, String data) {
+ public static Record createRecord(Integer id, String data) {
Record record = RECORD.copy();
record.setField("id", id);
record.setField("data", data);
return record;
}
- static RowData createRowData(Integer id, String data) {
+ public static RowData createRowData(Integer id, String data) {
return GenericRowData.of(id, StringData.fromString(data));
}
- static void assertTableRows(String tablePath, List<RowData> rows) throws
IOException {
- List<Record> records = Lists.newArrayList();
- for (RowData row : rows) {
+ public static DataFile writeFile(Schema schema, PartitionSpec spec,
Configuration conf,
+ String location, String filename,
List<RowData> rows)
+ throws IOException {
+ Path path = new Path(location, filename);
+ FileFormat fileFormat = FileFormat.fromFileName(filename);
+ Preconditions.checkNotNull(fileFormat, "Cannot determine format for file:
%s", filename);
+
+ RowType flinkSchema = FlinkSchemaUtil.convert(schema);
+ FileAppenderFactory<RowData> appenderFactory = new
RowDataTaskWriterFactory.FlinkFileAppenderFactory(schema,
+ flinkSchema, ImmutableMap.of());
+
+ FileAppender<RowData> appender =
appenderFactory.newAppender(fromPath(path, conf), fileFormat);
+ try (FileAppender<RowData> closeableAppender = appender) {
+ closeableAppender.addAll(rows);
+ }
+
+ return DataFiles.builder(spec)
+ .withInputFile(HadoopInputFile.fromPath(path, conf))
+ .withMetrics(appender.metrics())
+ .build();
+ }
+
+ public static void assertTableRows(String tablePath, List<RowData> expected)
throws IOException {
+ List<Record> expectedRecords = Lists.newArrayList();
+ for (RowData row : expected) {
Integer id = row.isNullAt(0) ? null : row.getInt(0);
String data = row.isNullAt(1) ? null : row.getString(1).toString();
- records.add(createRecord(id, data));
+ expectedRecords.add(createRecord(id, data));
}
- assertTableRecords(tablePath, records);
+ assertTableRecords(tablePath, expectedRecords);
}
- static void assertTableRecords(String tablePath, List<Record> expected)
throws IOException {
+ public static void assertTableRecords(String tablePath, List<Record>
expected) throws IOException {
Preconditions.checkArgument(expected != null, "expected records shouldn't
be null");
Table newTable = new HadoopTables().load(tablePath);
- Set<Record> resultSet;
- try (CloseableIterable<Record> iterable = (CloseableIterable<Record>)
IcebergGenerics.read(newTable).build()) {
- resultSet = Sets.newHashSet(iterable);
+ try (CloseableIterable<Record> iterable =
IcebergGenerics.read(newTable).build()) {
+ Assert.assertEquals("Should produce the expected record",
+ Sets.newHashSet(expected), Sets.newHashSet(iterable));
}
- Assert.assertEquals("Should produce the expected record", resultSet,
Sets.newHashSet(expected));
}
}
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
new file mode 100644
index 0000000..0aabfda
--- /dev/null
+++
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestFlinkIcebergSink.java
@@ -0,0 +1,178 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.flink.api.java.typeutils.RowTypeInfo;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.util.FiniteTestSource;
+import org.apache.flink.table.api.TableSchema;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.data.util.DataFormatConverters;
+import org.apache.flink.table.runtime.typeutils.RowDataTypeInfo;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestFlinkIcebergSink extends AbstractTestBase {
+ private static final Configuration CONF = new Configuration();
+ private static final TypeInformation<Row> ROW_TYPE_INFO = new RowTypeInfo(
+ SimpleDataUtil.FLINK_SCHEMA.getFieldTypes());
+ private static final DataFormatConverters.RowConverter CONVERTER = new
DataFormatConverters.RowConverter(
+ SimpleDataUtil.FLINK_SCHEMA.getFieldDataTypes());
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private String tablePath;
+ private Table table;
+ private StreamExecutionEnvironment env;
+ private TableLoader tableLoader;
+
+ private final FileFormat format;
+ private final int parallelism;
+ private final boolean partitioned;
+
+ @Parameterized.Parameters(name = "format={0}, parallelism = {1}, partitioned
= {2}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] {"avro", 1, true},
+ new Object[] {"avro", 1, false},
+ new Object[] {"avro", 2, true},
+ new Object[] {"avro", 2, false},
+ new Object[] {"orc", 1, true},
+ new Object[] {"orc", 1, false},
+ new Object[] {"orc", 2, true},
+ new Object[] {"orc", 2, false},
+ new Object[] {"parquet", 1, true},
+ new Object[] {"parquet", 1, false},
+ new Object[] {"parquet", 2, true},
+ new Object[] {"parquet", 2, false}
+ };
+ }
+
+ public TestFlinkIcebergSink(String format, int parallelism, boolean
partitioned) {
+ this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+ this.parallelism = parallelism;
+ this.partitioned = partitioned;
+ }
+
+ @Before
+ public void before() throws IOException {
+ File folder = tempFolder.newFolder();
+ String warehouse = folder.getAbsolutePath();
+
+ tablePath = warehouse.concat("/test");
+ Assert.assertTrue("Should create the table path correctly.", new
File(tablePath).mkdir());
+
+ Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+ table = SimpleDataUtil.createTable(tablePath, props, partitioned);
+
+ env = StreamExecutionEnvironment.getExecutionEnvironment()
+ .enableCheckpointing(100)
+ .setParallelism(parallelism)
+ .setMaxParallelism(parallelism);
+
+ tableLoader = TableLoader.fromHadoopTable(tablePath);
+ }
+
+ private List<RowData> convertToRowData(List<Row> rows) {
+ return
rows.stream().map(CONVERTER::toInternal).collect(Collectors.toList());
+ }
+
+ @Test
+ public void testWriteRowData() throws Exception {
+ List<Row> rows = Lists.newArrayList(
+ Row.of(1, "hello"),
+ Row.of(2, "world"),
+ Row.of(3, "foo")
+ );
+ DataStream<RowData> dataStream = env.addSource(new
FiniteTestSource<>(rows), ROW_TYPE_INFO)
+ .map(CONVERTER::toInternal,
RowDataTypeInfo.of(SimpleDataUtil.ROW_TYPE));
+
+ FlinkSink.forRowData(dataStream)
+ .table(table)
+ .tableLoader(tableLoader)
+ .hadoopConf(CONF)
+ .build();
+
+ // Execute the program.
+ env.execute("Test Iceberg DataStream");
+
+ // Assert the iceberg table's records. NOTICE: the FiniteTestSource will
checkpoint the same rows twice, so it will
+ // commit the same row list into iceberg twice.
+ List<RowData> expectedRows =
Lists.newArrayList(Iterables.concat(convertToRowData(rows),
convertToRowData(rows)));
+ SimpleDataUtil.assertTableRows(tablePath, expectedRows);
+ }
+
+ private void testWriteRow(TableSchema tableSchema) throws Exception {
+ List<Row> rows = Lists.newArrayList(
+ Row.of(4, "bar"),
+ Row.of(5, "apache")
+ );
+ DataStream<Row> dataStream = env.addSource(new FiniteTestSource<>(rows),
ROW_TYPE_INFO);
+
+ FlinkSink.forRow(dataStream, SimpleDataUtil.FLINK_SCHEMA)
+ .table(table)
+ .tableLoader(tableLoader)
+ .tableSchema(tableSchema)
+ .hadoopConf(CONF)
+ .build();
+
+ // Execute the program.
+ env.execute("Test Iceberg DataStream.");
+
+ List<RowData> expectedRows =
Lists.newArrayList(Iterables.concat(convertToRowData(rows),
convertToRowData(rows)));
+ SimpleDataUtil.assertTableRows(tablePath, expectedRows);
+ }
+
+ @Test
+ public void testWriteRow() throws Exception {
+ testWriteRow(null);
+ }
+
+ @Test
+ public void testWriteRowWithTableSchema() throws Exception {
+ testWriteRow(SimpleDataUtil.FLINK_SCHEMA);
+ }
+}
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
new file mode 100644
index 0000000..e5a9a8d
--- /dev/null
+++
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergFilesCommitter.java
@@ -0,0 +1,505 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iceberg.flink.sink;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.checkpoint.OperatorSubtaskState;
+import org.apache.flink.runtime.operators.testutils.MockEnvironment;
+import org.apache.flink.runtime.operators.testutils.MockEnvironmentBuilder;
+import org.apache.flink.runtime.operators.testutils.MockInputSplitProvider;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.BoundedOneInput;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperatorFactory;
+import org.apache.flink.streaming.api.operators.StreamOperator;
+import org.apache.flink.streaming.api.operators.StreamOperatorParameters;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.RowData;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.AssertHelpers;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.flink.SimpleDataUtil;
+import org.apache.iceberg.flink.TableLoader;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestIcebergFilesCommitter {
+ private static final Configuration CONF = new Configuration();
+
+ @Rule
+ public TemporaryFolder tempFolder = new TemporaryFolder();
+
+ private String tablePath;
+ private Table table;
+
+ private final FileFormat format;
+
+ @Parameterized.Parameters(name = "format = {0}")
+ public static Object[][] parameters() {
+ return new Object[][] {
+ new Object[] {"avro"},
+ new Object[] {"orc"},
+ new Object[] {"parquet"}
+ };
+ }
+
+ public TestIcebergFilesCommitter(String format) {
+ this.format = FileFormat.valueOf(format.toUpperCase(Locale.ENGLISH));
+ }
+
+ @Before
+ public void before() throws IOException {
+ File folder = tempFolder.newFolder();
+ String warehouse = folder.getAbsolutePath();
+
+ tablePath = warehouse.concat("/test");
+ Assert.assertTrue("Should create the table directory correctly.", new
File(tablePath).mkdir());
+
+ // Construct the iceberg table.
+ Map<String, String> props =
ImmutableMap.of(TableProperties.DEFAULT_FILE_FORMAT, format.name());
+ table = SimpleDataUtil.createTable(tablePath, props, false);
+ }
+
+ @Test
+ public void testCommitTxnWithoutDataFiles() throws Exception {
+ long checkpointId = 0;
+ long timestamp = 0;
+ JobID jobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ SimpleDataUtil.assertTableRows(tablePath, Lists.newArrayList());
+ assertSnapshotSize(0);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ // It's better to advance the max-committed-checkpoint-id in iceberg
snapshot, so that the future flink job
+ // failover won't fail.
+ for (int i = 1; i <= 3; i++) {
+ harness.snapshot(++checkpointId, ++timestamp);
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+ assertSnapshotSize(i);
+ assertMaxCommittedCheckpointId(jobId, checkpointId);
+ }
+ }
+ }
+
+ @Test
+ public void testCommitTxn() throws Exception {
+ // Test with 3 continues checkpoints:
+ // 1. snapshotState for checkpoint#1
+ // 2. notifyCheckpointComplete for checkpoint#1
+ // 3. snapshotState for checkpoint#2
+ // 4. notifyCheckpointComplete for checkpoint#2
+ // 5. snapshotState for checkpoint#3
+ // 6. notifyCheckpointComplete for checkpoint#3
+ long timestamp = 0;
+
+ JobID jobID = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobID)) {
+ harness.setup();
+ harness.open();
+ assertSnapshotSize(0);
+
+ List<RowData> rows = Lists.newArrayListWithExpectedSize(3);
+ for (int i = 1; i <= 3; i++) {
+ RowData rowData = SimpleDataUtil.createRowData(i, "hello" + i);
+ DataFile dataFile = writeDataFile("data-" + i,
ImmutableList.of(rowData));
+ harness.processElement(dataFile, ++timestamp);
+ rows.add(rowData);
+
+ harness.snapshot(i, ++timestamp);
+
+ harness.notifyOfCompletedCheckpoint(i);
+
+ SimpleDataUtil.assertTableRows(tablePath, ImmutableList.copyOf(rows));
+ assertSnapshotSize(i);
+ assertMaxCommittedCheckpointId(jobID, i);
+ }
+ }
+ }
+
+ @Test
+ public void testOrderedEventsBetweenCheckpoints() throws Exception {
+ // It's possible that two checkpoints happen in the following orders:
+ // 1. snapshotState for checkpoint#1;
+ // 2. snapshotState for checkpoint#2;
+ // 3. notifyCheckpointComplete for checkpoint#1;
+ // 4. notifyCheckpointComplete for checkpoint#2;
+ long timestamp = 0;
+
+ JobID jobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ RowData row1 = SimpleDataUtil.createRowData(1, "hello");
+ DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
+
+ harness.processElement(dataFile1, ++timestamp);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ // 1. snapshotState for checkpoint#1
+ long firstCheckpointId = 1;
+ harness.snapshot(firstCheckpointId, ++timestamp);
+
+ RowData row2 = SimpleDataUtil.createRowData(2, "world");
+ DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
+ harness.processElement(dataFile2, ++timestamp);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ // 2. snapshotState for checkpoint#2
+ long secondCheckpointId = 2;
+ harness.snapshot(secondCheckpointId, ++timestamp);
+
+ // 3. notifyCheckpointComplete for checkpoint#1
+ harness.notifyOfCompletedCheckpoint(firstCheckpointId);
+ SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1));
+ assertMaxCommittedCheckpointId(jobId, firstCheckpointId);
+
+ // 4. notifyCheckpointComplete for checkpoint#2
+ harness.notifyOfCompletedCheckpoint(secondCheckpointId);
+ SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
+ assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ }
+ }
+
+ @Test
+ public void testDisorderedEventsBetweenCheckpoints() throws Exception {
+ // It's possible that the two checkpoints happen in the following orders:
+ // 1. snapshotState for checkpoint#1;
+ // 2. snapshotState for checkpoint#2;
+ // 3. notifyCheckpointComplete for checkpoint#2;
+ // 4. notifyCheckpointComplete for checkpoint#1;
+ long timestamp = 0;
+
+ JobID jobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ RowData row1 = SimpleDataUtil.createRowData(1, "hello");
+ DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row1));
+
+ harness.processElement(dataFile1, ++timestamp);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ // 1. snapshotState for checkpoint#1
+ long firstCheckpointId = 1;
+ harness.snapshot(firstCheckpointId, ++timestamp);
+
+ RowData row2 = SimpleDataUtil.createRowData(2, "world");
+ DataFile dataFile2 = writeDataFile("data-2", ImmutableList.of(row2));
+ harness.processElement(dataFile2, ++timestamp);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ // 2. snapshotState for checkpoint#2
+ long secondCheckpointId = 2;
+ harness.snapshot(secondCheckpointId, ++timestamp);
+
+ // 3. notifyCheckpointComplete for checkpoint#2
+ harness.notifyOfCompletedCheckpoint(secondCheckpointId);
+ SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
+ assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+
+ // 4. notifyCheckpointComplete for checkpoint#1
+ harness.notifyOfCompletedCheckpoint(firstCheckpointId);
+ SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row1, row2));
+ assertMaxCommittedCheckpointId(jobId, secondCheckpointId);
+ }
+ }
+
+ @Test
+ public void testRecoveryFromInvalidSnapshot() throws Exception {
+ long checkpointId = 0;
+ long timestamp = 0;
+ OperatorSubtaskState snapshot;
+
+ JobID jobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ assertSnapshotSize(0);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ RowData row = SimpleDataUtil.createRowData(1, "hello");
+ DataFile dataFile = writeDataFile("data-1", ImmutableList.of(row));
+
+ harness.processElement(dataFile, ++timestamp);
+ snapshot = harness.snapshot(++checkpointId, ++timestamp);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+ SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of());
+ }
+
+ // Restore from the given snapshot
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ AssertHelpers.assertThrows("Could not restore because there's no valid
snapshot.",
+ IllegalStateException.class,
+ "There should be an existing iceberg snapshot for current flink job",
+ () -> {
+ harness.initializeState(snapshot);
+ return null;
+ });
+ }
+ }
+
+ @Test
+ public void testRecoveryFromValidSnapshot() throws Exception {
+ long checkpointId = 0;
+ long timestamp = 0;
+ List<RowData> expectedRows = Lists.newArrayList();
+ OperatorSubtaskState snapshot;
+
+ JobID jobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ assertSnapshotSize(0);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ RowData row = SimpleDataUtil.createRowData(1, "hello");
+ expectedRows.add(row);
+ DataFile dataFile1 = writeDataFile("data-1", ImmutableList.of(row));
+
+ harness.processElement(dataFile1, ++timestamp);
+ snapshot = harness.snapshot(++checkpointId, ++timestamp);
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+ SimpleDataUtil.assertTableRows(tablePath, ImmutableList.of(row));
+ assertSnapshotSize(1);
+ assertMaxCommittedCheckpointId(jobId, checkpointId);
+ }
+
+ // Restore from the given snapshot
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ harness.initializeState(snapshot);
+ harness.open();
+
+ SimpleDataUtil.assertTableRows(tablePath, expectedRows);
+ assertSnapshotSize(1);
+ assertMaxCommittedCheckpointId(jobId, checkpointId);
+
+ RowData row = SimpleDataUtil.createRowData(2, "world");
+ expectedRows.add(row);
+ DataFile dataFile = writeDataFile("data-2", ImmutableList.of(row));
+ harness.processElement(dataFile, ++timestamp);
+
+ harness.snapshot(++checkpointId, ++timestamp);
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+ SimpleDataUtil.assertTableRows(tablePath, expectedRows);
+ assertSnapshotSize(2);
+ assertMaxCommittedCheckpointId(jobId, checkpointId);
+ }
+ }
+
+ @Test
+ public void testStartAnotherJobToWriteSameTable() throws Exception {
+ long checkpointId = 0;
+ long timestamp = 0;
+ List<RowData> rows = Lists.newArrayList();
+ List<RowData> tableRows = Lists.newArrayList();
+
+ JobID oldJobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(oldJobId)) {
+ harness.setup();
+ harness.open();
+
+ assertSnapshotSize(0);
+ assertMaxCommittedCheckpointId(oldJobId, -1L);
+
+ for (int i = 1; i <= 3; i++) {
+ rows.add(SimpleDataUtil.createRowData(i, "hello" + i));
+ tableRows.addAll(rows);
+
+ DataFile dataFile = writeDataFile(String.format("data-%d", i), rows);
+ harness.processElement(dataFile, ++timestamp);
+ harness.snapshot(++checkpointId, ++timestamp);
+
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+ SimpleDataUtil.assertTableRows(tablePath, tableRows);
+ assertSnapshotSize(i);
+ assertMaxCommittedCheckpointId(oldJobId, checkpointId);
+ }
+ }
+
+ // The new started job will start with checkpoint = 1 again.
+ checkpointId = 0;
+ timestamp = 0;
+ JobID newJobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(newJobId)) {
+ harness.setup();
+ harness.open();
+
+ assertSnapshotSize(3);
+ assertMaxCommittedCheckpointId(oldJobId, 3);
+ assertMaxCommittedCheckpointId(newJobId, -1);
+
+ rows.add(SimpleDataUtil.createRowData(2, "world"));
+ tableRows.addAll(rows);
+
+ DataFile dataFile = writeDataFile("data-new-1", rows);
+ harness.processElement(dataFile, ++timestamp);
+ harness.snapshot(++checkpointId, ++timestamp);
+
+ harness.notifyOfCompletedCheckpoint(checkpointId);
+ SimpleDataUtil.assertTableRows(tablePath, tableRows);
+ assertSnapshotSize(4);
+ assertMaxCommittedCheckpointId(newJobId, checkpointId);
+ }
+ }
+
+ @Test
+ public void testMultipleJobsWriteSameTable() throws Exception {
+ long timestamp = 0;
+ List<RowData> tableRows = Lists.newArrayList();
+
+ JobID[] jobs = new JobID[] {new JobID(), new JobID(), new JobID()};
+ for (int i = 0; i < 20; i++) {
+ int jobIndex = i % 3;
+ int checkpointId = i / 3;
+ JobID jobId = jobs[jobIndex];
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ assertSnapshotSize(i);
+ assertMaxCommittedCheckpointId(jobId, checkpointId == 0 ? -1 :
checkpointId);
+
+ List<RowData> rows =
Lists.newArrayList(SimpleDataUtil.createRowData(i, "word-" + i));
+ tableRows.addAll(rows);
+
+ DataFile dataFile = writeDataFile(String.format("data-%d", i), rows);
+ harness.processElement(dataFile, ++timestamp);
+ harness.snapshot(checkpointId + 1, ++timestamp);
+
+ harness.notifyOfCompletedCheckpoint(checkpointId + 1);
+ SimpleDataUtil.assertTableRows(tablePath, tableRows);
+ assertSnapshotSize(i + 1);
+ assertMaxCommittedCheckpointId(jobId, checkpointId + 1);
+ }
+ }
+ }
+
+ @Test
+ public void testBoundedStream() throws Exception {
+ JobID jobId = new JobID();
+ try (OneInputStreamOperatorTestHarness<DataFile, Void> harness =
createStreamSink(jobId)) {
+ harness.setup();
+ harness.open();
+
+ assertSnapshotSize(0);
+ assertMaxCommittedCheckpointId(jobId, -1L);
+
+ List<RowData> tableRows =
Lists.newArrayList(SimpleDataUtil.createRowData(1, "word-1"));
+
+ DataFile dataFile = writeDataFile("data-1", tableRows);
+ harness.processElement(dataFile, 1);
+ ((BoundedOneInput) harness.getOneInputOperator()).endInput();
+
+ SimpleDataUtil.assertTableRows(tablePath, tableRows);
+ assertSnapshotSize(1);
+ assertMaxCommittedCheckpointId(jobId, Long.MAX_VALUE);
+ }
+ }
+
+ private DataFile writeDataFile(String filename, List<RowData> rows) throws
IOException {
+ return SimpleDataUtil.writeFile(table.schema(), table.spec(), CONF,
tablePath, format.addExtension(filename), rows);
+ }
+
+ private void assertMaxCommittedCheckpointId(JobID jobID, long expectedId) {
+ table.refresh();
+ long actualId = IcebergFilesCommitter.getMaxCommittedCheckpointId(table,
jobID.toString());
+ Assert.assertEquals(expectedId, actualId);
+ }
+
+ private void assertSnapshotSize(int expectedSnapshotSize) {
+ table.refresh();
+ Assert.assertEquals(expectedSnapshotSize,
Lists.newArrayList(table.snapshots()).size());
+ }
+
+ private OneInputStreamOperatorTestHarness<DataFile, Void>
createStreamSink(JobID jobID)
+ throws Exception {
+ TestOperatorFactory factory = TestOperatorFactory.of(tablePath);
+ return new OneInputStreamOperatorTestHarness<>(factory,
createEnvironment(jobID));
+ }
+
+ private static MockEnvironment createEnvironment(JobID jobID) {
+ return new MockEnvironmentBuilder()
+ .setTaskName("test task")
+ .setManagedMemorySize(32 * 1024)
+ .setInputSplitProvider(new MockInputSplitProvider())
+ .setBufferSize(256)
+ .setTaskConfiguration(new
org.apache.flink.configuration.Configuration())
+ .setExecutionConfig(new ExecutionConfig())
+ .setMaxParallelism(16)
+ .setJobID(jobID)
+ .build();
+ }
+
+ private static class TestOperatorFactory extends
AbstractStreamOperatorFactory<Void>
+ implements OneInputStreamOperatorFactory<DataFile, Void> {
+ private final String tablePath;
+
+ private TestOperatorFactory(String tablePath) {
+ this.tablePath = tablePath;
+ }
+
+ private static TestOperatorFactory of(String tablePath) {
+ return new TestOperatorFactory(tablePath);
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T extends StreamOperator<Void>> T
createStreamOperator(StreamOperatorParameters<Void> param) {
+ IcebergFilesCommitter committer = new
IcebergFilesCommitter(TableLoader.fromHadoopTable(tablePath), CONF);
+ committer.setup(param.getContainingTask(), param.getStreamConfig(),
param.getOutput());
+ return (T) committer;
+ }
+
+ @Override
+ public Class<? extends StreamOperator> getStreamOperatorClass(ClassLoader
classLoader) {
+ return IcebergFilesCommitter.class;
+ }
+ }
+}
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
similarity index 97%
rename from
flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java
rename to
flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
index c6eee46..f4e10d5 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestIcebergStreamWriter.java
+++
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestIcebergStreamWriter.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
import java.io.File;
import java.io.IOException;
@@ -45,6 +45,7 @@ import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.GenericRecord;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.hadoop.HadoopTables;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableSet;
@@ -70,14 +71,15 @@ public class TestIcebergStreamWriter {
private final FileFormat format;
private final boolean partitioned;
- // TODO add Parquet unit test once the readers and writers are ready.
@Parameterized.Parameters(name = "format = {0}, partitioned = {1}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {"avro", true},
new Object[] {"avro", false},
new Object[] {"orc", true},
- new Object[] {"orc", false}
+ new Object[] {"orc", false},
+ new Object[] {"parquet", true},
+ new Object[] {"parquet", false}
};
}
@@ -315,7 +317,7 @@ public class TestIcebergStreamWriter {
private OneInputStreamOperatorTestHarness<RowData, DataFile>
createIcebergStreamWriter(
Table icebergTable, TableSchema flinkSchema) throws Exception {
- IcebergStreamWriter<RowData> streamWriter =
IcebergSinkUtil.createStreamWriter(icebergTable, flinkSchema);
+ IcebergStreamWriter<RowData> streamWriter =
FlinkSink.createStreamWriter(icebergTable, flinkSchema);
OneInputStreamOperatorTestHarness<RowData, DataFile> harness = new
OneInputStreamOperatorTestHarness<>(
streamWriter, 1, 1, 0);
diff --git
a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
similarity index 99%
rename from
flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java
rename to
flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
index 22a8654..d8c0de0 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestRowDataPartitionKey.java
+++
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestRowDataPartitionKey.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
import java.util.List;
import java.util.stream.Collectors;
@@ -31,6 +31,7 @@ import org.apache.iceberg.Schema;
import org.apache.iceberg.data.InternalRecordWrapper;
import org.apache.iceberg.data.RandomGenericData;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
import org.apache.iceberg.flink.data.RandomRowData;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.types.Types;
diff --git a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java
b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
similarity index 97%
rename from flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java
rename to flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
index bb3841e..c4c6979 100644
--- a/flink/src/test/java/org/apache/iceberg/flink/TestTaskWriters.java
+++ b/flink/src/test/java/org/apache/iceberg/flink/sink/TestTaskWriters.java
@@ -17,7 +17,7 @@
* under the License.
*/
-package org.apache.iceberg.flink;
+package org.apache.iceberg.flink.sink;
import java.io.File;
import java.io.IOException;
@@ -35,6 +35,7 @@ import org.apache.iceberg.FileFormat;
import org.apache.iceberg.Table;
import org.apache.iceberg.TableProperties;
import org.apache.iceberg.data.Record;
+import org.apache.iceberg.flink.SimpleDataUtil;
import org.apache.iceberg.flink.data.RandomRowData;
import org.apache.iceberg.io.TaskWriter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
@@ -55,14 +56,15 @@ public class TestTaskWriters {
@Rule
public final TemporaryFolder tempFolder = new TemporaryFolder();
- // TODO add Parquet unit test once the readers and writers are ready.
@Parameterized.Parameters(name = "format = {0}, partitioned = {1}")
public static Object[][] parameters() {
return new Object[][] {
new Object[] {"avro", true},
new Object[] {"avro", false},
new Object[] {"orc", true},
- new Object[] {"orc", false}
+ new Object[] {"orc", false},
+ new Object[] {"parquet", true},
+ new Object[] {"parquet", false}
};
}