This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 7b4f0e7430 [core] Introduce format table write (#6288)
7b4f0e7430 is described below
commit 7b4f0e74309407ccac9f2efdd803a2abb101cb7d
Author: jerry <[email protected]>
AuthorDate: Thu Oct 9 10:41:59 2025 +0800
[core] Introduce format table write (#6288)
---
.../paimon/append/RollingBlobFileWriter.java | 6 +-
.../apache/paimon/io/FileWriterAbortExecutor.java | 37 +++++
.../paimon/io/FormatTableRollingFileWriter.java | 144 +++++++++++++++++++
...riter.java => FormatTableSingleFileWriter.java} | 123 +++++-----------
.../apache/paimon/io/RollingFileWriterImpl.java | 5 +-
.../org/apache/paimon/io/SingleFileWriter.java | 20 +--
.../java/org/apache/paimon/table/FormatTable.java | 3 +-
.../table/format/FormatBatchWriteBuilder.java | 81 +++++++++++
.../paimon/table/format/FormatTableFileWriter.java | 118 ++++++++++++++++
.../table/format/FormatTableRecordWriter.java | 87 ++++++++++++
.../paimon/table/format/FormatTableWrite.java | 154 +++++++++++++++++++++
.../paimon/table/format/TwoPhaseCommitMessage.java | 53 +++++++
.../sink/FormatTableRowPartitionKeyExtractor.java | 47 +++++++
.../apache/paimon/utils/FileStorePathFactory.java | 30 +++-
.../org/apache/paimon/catalog/CatalogTestBase.java | 140 ++++---------------
15 files changed, 817 insertions(+), 231 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
index b4b15f2ef9..17d4be7c76 100644
---
a/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/append/RollingBlobFileWriter.java
@@ -26,11 +26,11 @@ import org.apache.paimon.fs.FileIO;
import org.apache.paimon.io.BundleRecords;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FileWriterAbortExecutor;
import org.apache.paimon.io.RollingFileWriter;
import org.apache.paimon.io.RollingFileWriterImpl;
import org.apache.paimon.io.RowDataFileWriter;
import org.apache.paimon.io.SingleFileWriter;
-import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
import org.apache.paimon.manifest.FileSource;
import org.apache.paimon.statistics.NoneSimpleColStatsCollector;
import org.apache.paimon.statistics.SimpleColStatsCollector;
@@ -91,7 +91,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
private final long targetFileSize;
// State management
- private final List<AbortExecutor> closedWriters;
+ private final List<FileWriterAbortExecutor> closedWriters;
private final List<DataFileMeta> results;
private PeojectedFileWriter<SingleFileWriter<InternalRow, DataFileMeta>,
DataFileMeta>
currentWriter;
@@ -316,7 +316,7 @@ public class RollingBlobFileWriter implements
RollingFileWriter<InternalRow, Dat
currentWriter.abort();
currentWriter = null;
}
- for (AbortExecutor abortExecutor : closedWriters) {
+ for (FileWriterAbortExecutor abortExecutor : closedWriters) {
abortExecutor.abort();
}
blobWriter.abort();
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FileWriterAbortExecutor.java
b/paimon-core/src/main/java/org/apache/paimon/io/FileWriterAbortExecutor.java
new file mode 100644
index 0000000000..4c10608c11
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/io/FileWriterAbortExecutor.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.Path;
+
+/** Abort executor to just have reference of path instead of whole writer. */
+public class FileWriterAbortExecutor {
+ private final FileIO fileIO;
+ private final Path path;
+
+ public FileWriterAbortExecutor(FileIO fileIO, Path path) {
+ this.fileIO = fileIO;
+ this.path = path;
+ }
+
+ public void abort() {
+ fileIO.deleteQuietly(path);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/FormatTableRollingFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/FormatTableRollingFileWriter.java
new file mode 100644
index 0000000000..f081abf310
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/io/FormatTableRollingFileWriter.java
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.types.RowType;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * Format table's writer to roll over to a new file if the current size exceed
the target file size.
+ */
+public class FormatTableRollingFileWriter implements AutoCloseable {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FormatTableRollingFileWriter.class);
+
+ private static final int CHECK_ROLLING_RECORD_CNT = 1000;
+
+ private final Supplier<FormatTableSingleFileWriter> writerFactory;
+ private final long targetFileSize;
+ private final List<FileWriterAbortExecutor> closedWriters;
+ private final List<TwoPhaseOutputStream.Committer> committers;
+
+ private FormatTableSingleFileWriter currentWriter = null;
+ private long recordCount = 0;
+ private boolean closed = false;
+
+ public FormatTableRollingFileWriter(
+ FileIO fileIO,
+ FileFormat fileFormat,
+ long targetFileSize,
+ RowType writeSchema,
+ DataFilePathFactory pathFactory,
+ String fileCompression) {
+ this.writerFactory =
+ () ->
+ new FormatTableSingleFileWriter(
+ fileIO,
+ fileFormat.createWriterFactory(writeSchema),
+ pathFactory.newPath(),
+ fileCompression);
+ this.targetFileSize = targetFileSize;
+ this.closedWriters = new ArrayList<>();
+ this.committers = new ArrayList<>();
+ }
+
+ public long targetFileSize() {
+ return targetFileSize;
+ }
+
+ public void write(InternalRow row) throws IOException {
+ try {
+ if (currentWriter == null) {
+ currentWriter = writerFactory.get();
+ }
+
+ currentWriter.write(row);
+ recordCount += 1;
+ boolean needRolling =
+ currentWriter.reachTargetSize(
+ recordCount % CHECK_ROLLING_RECORD_CNT == 0,
targetFileSize);
+ if (needRolling) {
+ closeCurrentWriter();
+ }
+ } catch (Throwable e) {
+ LOG.warn(
+ "Exception occurs when writing file {}. Cleaning up.",
+ currentWriter == null ? null : currentWriter.path(),
+ e);
+ abort();
+ throw e;
+ }
+ }
+
+ private void closeCurrentWriter() throws IOException {
+ if (currentWriter == null) {
+ return;
+ }
+
+ currentWriter.close();
+ closedWriters.add(currentWriter.abortExecutor());
+ if (currentWriter.committers() != null) {
+ committers.addAll(currentWriter.committers());
+ }
+
+ currentWriter = null;
+ }
+
+ public void abort() {
+ if (currentWriter != null) {
+ currentWriter.abort();
+ }
+ for (FileWriterAbortExecutor abortExecutor : closedWriters) {
+ abortExecutor.abort();
+ }
+ }
+
+ public List<TwoPhaseOutputStream.Committer> committers() {
+ return committers;
+ }
+
+ @Override
+ public void close() throws IOException {
+ if (closed) {
+ return;
+ }
+
+ try {
+ closeCurrentWriter();
+ } catch (IOException e) {
+ LOG.warn(
+ "Exception occurs when writing file {}. Cleaning up.",
currentWriter.path(), e);
+ abort();
+ throw e;
+ } finally {
+ closed = true;
+ }
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/FormatTableSingleFileWriter.java
similarity index 55%
copy from paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
copy to
paimon-core/src/main/java/org/apache/paimon/io/FormatTableSingleFileWriter.java
index c9bf104249..1e204f1f22 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++
b/paimon-core/src/main/java/org/apache/paimon/io/FormatTableSingleFileWriter.java
@@ -19,63 +19,49 @@
package org.apache.paimon.io;
import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.format.BundleFormatWriter;
import org.apache.paimon.format.FormatWriter;
import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.SupportsDirectWrite;
-import org.apache.paimon.fs.AsyncPositionOutputStream;
import org.apache.paimon.fs.FileIO;
import org.apache.paimon.fs.Path;
import org.apache.paimon.fs.PositionOutputStream;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
import org.apache.paimon.utils.IOUtils;
+import org.apache.paimon.shade.guava30.com.google.common.collect.Lists;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.io.UncheckedIOException;
-import java.util.function.Function;
+import java.util.List;
-/**
- * A {@link FileWriter} to produce a single file.
- *
- * @param <T> type of records to write.
- * @param <R> type of result to produce after writing a file.
- */
-public abstract class SingleFileWriter<T, R> implements FileWriter<T, R> {
+/** Format table's writer to produce a single file. */
+public class FormatTableSingleFileWriter {
- private static final Logger LOG =
LoggerFactory.getLogger(SingleFileWriter.class);
+ private static final Logger LOG =
LoggerFactory.getLogger(FormatTableSingleFileWriter.class);
protected final FileIO fileIO;
protected final Path path;
- private final Function<T, InternalRow> converter;
private FormatWriter writer;
private PositionOutputStream out;
+ private TwoPhaseOutputStream.Committer committer;
protected long outputBytes;
- private long recordCount;
protected boolean closed;
- public SingleFileWriter(
- FileIO fileIO,
- FormatWriterFactory factory,
- Path path,
- Function<T, InternalRow> converter,
- String compression,
- boolean asyncWrite) {
+ public FormatTableSingleFileWriter(
+ FileIO fileIO, FormatWriterFactory factory, Path path, String
compression) {
this.fileIO = fileIO;
this.path = path;
- this.converter = converter;
try {
if (factory instanceof SupportsDirectWrite) {
- writer = ((SupportsDirectWrite) factory).create(fileIO, path,
compression);
+ throw new UnsupportedOperationException("Does not support
SupportsDirectWrite.");
} else {
- out = fileIO.newOutputStream(path, false);
- if (asyncWrite) {
- out = new AsyncPositionOutputStream(out);
- }
+ out = fileIO.newTwoPhaseOutputStream(path, false);
writer = factory.create(out, compression);
}
} catch (IOException e) {
@@ -88,7 +74,6 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
throw new UncheckedIOException(e);
}
- this.recordCount = 0;
this.closed = false;
}
@@ -96,80 +81,62 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
return path;
}
- @Override
- public void write(T record) throws IOException {
- writeImpl(record);
- }
-
- public void writeBundle(BundleRecords bundle) throws IOException {
- if (closed) {
- throw new RuntimeException("Writer has already closed!");
- }
-
- try {
- if (writer instanceof BundleFormatWriter) {
- ((BundleFormatWriter) writer).writeBundle(bundle);
- } else {
- for (InternalRow row : bundle) {
- writer.addElement(row);
- }
- }
- recordCount += bundle.rowCount();
- } catch (Throwable e) {
- LOG.warn("Exception occurs when writing file " + path + ".
Cleaning up.", e);
- abort();
- throw e;
- }
- }
-
- protected InternalRow writeImpl(T record) throws IOException {
+ public void write(InternalRow record) throws IOException {
if (closed) {
throw new RuntimeException("Writer has already closed!");
}
try {
- InternalRow rowData = converter.apply(record);
- writer.addElement(rowData);
- recordCount++;
- return rowData;
+ writer.addElement(record);
} catch (Throwable e) {
- LOG.warn("Exception occurs when writing file " + path + ".
Cleaning up.", e);
+ LOG.warn("Exception occurs when writing file {}. Cleaning up.",
path, e);
abort();
throw e;
}
}
- @Override
- public long recordCount() {
- return recordCount;
- }
-
public boolean reachTargetSize(boolean suggestedCheck, long targetSize)
throws IOException {
return writer.reachTargetSize(suggestedCheck, targetSize);
}
- @Override
public void abort() {
if (writer != null) {
IOUtils.closeQuietly(writer);
writer = null;
}
if (out != null) {
- IOUtils.closeQuietly(out);
+ try {
+ committer = ((TwoPhaseOutputStream) out).closeForCommit();
+ } catch (Throwable e) {
+ LOG.warn("Exception occurs when close for commit out {}",
committer, e);
+ }
out = null;
}
+ if (committer != null) {
+ try {
+ committer.discard();
+ } catch (Throwable e) {
+ LOG.warn("Exception occurs when close out {}", committer, e);
+ }
+ }
fileIO.deleteQuietly(path);
}
- public AbortExecutor abortExecutor() {
+ public List<TwoPhaseOutputStream.Committer> committers() {
+ if (!closed) {
+ throw new RuntimeException("Writer should be closed before getting
committer!");
+ }
+ return Lists.newArrayList(committer);
+ }
+
+ public FileWriterAbortExecutor abortExecutor() {
if (!closed) {
throw new RuntimeException("Writer should be closed!");
}
- return new AbortExecutor(fileIO, path);
+ return new FileWriterAbortExecutor(fileIO, path);
}
- @Override
public void close() throws IOException {
if (closed) {
return;
@@ -187,7 +154,7 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
if (out != null) {
out.flush();
outputBytes = out.getPos();
- out.close();
+ committer = ((TwoPhaseOutputStream) out).closeForCommit();
out = null;
}
} catch (IOException e) {
@@ -198,20 +165,4 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
closed = true;
}
}
-
- /** Abort executor to just have reference of path instead of whole writer.
*/
- public static class AbortExecutor {
-
- private final FileIO fileIO;
- private final Path path;
-
- private AbortExecutor(FileIO fileIO, Path path) {
- this.fileIO = fileIO;
- this.path = path;
- }
-
- public void abort() {
- fileIO.deleteQuietly(path);
- }
- }
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
index 76ee44b731..7353ea1971 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/RollingFileWriterImpl.java
@@ -19,7 +19,6 @@
package org.apache.paimon.io;
import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.io.SingleFileWriter.AbortExecutor;
import org.apache.paimon.utils.Preconditions;
import org.slf4j.Logger;
@@ -42,7 +41,7 @@ public class RollingFileWriterImpl<T, R> implements
RollingFileWriter<T, R> {
private final Supplier<? extends SingleFileWriter<T, R>> writerFactory;
private final long targetFileSize;
- private final List<AbortExecutor> closedWriters;
+ private final List<FileWriterAbortExecutor> closedWriters;
protected final List<R> results;
private SingleFileWriter<T, R> currentWriter = null;
@@ -145,7 +144,7 @@ public class RollingFileWriterImpl<T, R> implements
RollingFileWriter<T, R> {
if (currentWriter != null) {
currentWriter.abort();
}
- for (AbortExecutor abortExecutor : closedWriters) {
+ for (FileWriterAbortExecutor abortExecutor : closedWriters) {
abortExecutor.abort();
}
}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
index c9bf104249..3c3b11d8f2 100644
--- a/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/io/SingleFileWriter.java
@@ -161,12 +161,12 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
fileIO.deleteQuietly(path);
}
- public AbortExecutor abortExecutor() {
+ public FileWriterAbortExecutor abortExecutor() {
if (!closed) {
throw new RuntimeException("Writer should be closed!");
}
- return new AbortExecutor(fileIO, path);
+ return new FileWriterAbortExecutor(fileIO, path);
}
@Override
@@ -198,20 +198,4 @@ public abstract class SingleFileWriter<T, R> implements
FileWriter<T, R> {
closed = true;
}
}
-
- /** Abort executor to just have reference of path instead of whole writer.
*/
- public static class AbortExecutor {
-
- private final FileIO fileIO;
- private final Path path;
-
- private AbortExecutor(FileIO fileIO, Path path) {
- this.fileIO = fileIO;
- this.path = path;
- }
-
- public void abort() {
- fileIO.deleteQuietly(path);
- }
- }
}
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
index d3b6881246..2963dbb04d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FormatTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.manifest.IndexManifestEntry;
import org.apache.paimon.manifest.ManifestEntry;
import org.apache.paimon.manifest.ManifestFileMeta;
import org.apache.paimon.stats.Statistics;
+import org.apache.paimon.table.format.FormatBatchWriteBuilder;
import org.apache.paimon.table.format.FormatReadBuilder;
import org.apache.paimon.table.sink.BatchWriteBuilder;
import org.apache.paimon.table.sink.StreamWriteBuilder;
@@ -257,7 +258,7 @@ public interface FormatTable extends Table {
@Override
default BatchWriteBuilder newBatchWriteBuilder() {
- throw new UnsupportedOperationException();
+ return new FormatBatchWriteBuilder(this);
}
default RowType partitionType() {
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
new file mode 100644
index 0000000000..abb10f0535
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatBatchWriteBuilder.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.table.FormatTable;
+import org.apache.paimon.table.sink.BatchTableCommit;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.WriteSelector;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** A builder to build {@link FormatBatchWriteBuilder}. */
+public class FormatBatchWriteBuilder implements BatchWriteBuilder {
+
+ private static final long serialVersionUID = 1L;
+
+ private final FormatTable table;
+ protected final CoreOptions options;
+
+ public FormatBatchWriteBuilder(FormatTable table) {
+ this.table = table;
+ this.options = new CoreOptions(table.options());
+ }
+
+ @Override
+ public String tableName() {
+ return table.name();
+ }
+
+ @Override
+ public RowType rowType() {
+ return table.rowType();
+ }
+
+ @Override
+ public Optional<WriteSelector> newWriteSelector() {
+ return table.newBatchWriteBuilder().newWriteSelector();
+ }
+
+ @Override
+ public BatchTableWrite newWrite() {
+ return new FormatTableWrite(
+ table.fileIO(),
+ rowType(),
+ this.options,
+ table.partitionType(),
+ table.partitionKeys());
+ }
+
+ @Override
+ public BatchTableCommit newCommit() {
+ throw new UnsupportedOperationException("FormatTable does not support
commit");
+ }
+
+ @Override
+ public BatchWriteBuilder withOverwrite(@Nullable Map<String, String>
staticPartition) {
+ throw new UnsupportedOperationException("FormatTable does not support
commit");
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
new file mode 100644
index 0000000000..3d6af1b227
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableFileWriter.java
@@ -0,0 +1,118 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.types.RowType;
+import org.apache.paimon.utils.FileStorePathFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.format.FileFormat.fileFormat;
+
+/** File writer for format table. */
+public class FormatTableFileWriter {
+
+ private final FileIO fileIO;
+ private RowType rowType;
+ private RowType partitionType;
+ private final FileFormat fileFormat;
+ private final FileStorePathFactory pathFactory;
+ protected final Map<BinaryRow, FormatTableRecordWriter> writers;
+ protected final CoreOptions options;
+
+ public FormatTableFileWriter(
+ FileIO fileIO, RowType rowType, CoreOptions options, RowType
partitionType) {
+ this.fileIO = fileIO;
+ this.rowType = rowType;
+ this.partitionType = partitionType;
+ this.fileFormat = fileFormat(options);
+ this.writers = new HashMap<>();
+ this.options = options;
+ this.pathFactory =
+ new FileStorePathFactory(
+ options.path(),
+ partitionType,
+ options.partitionDefaultName(),
+ options.fileFormatString(),
+ options.dataFilePrefix(),
+ options.changelogFilePrefix(),
+ options.legacyPartitionName(),
+ options.fileSuffixIncludeCompression(),
+ options.fileCompression(),
+ options.dataFilePathDirectory(),
+ null,
+ false);
+ }
+
+ public void withWriteType(RowType writeType) {
+ this.rowType = writeType;
+ }
+
+ public void write(BinaryRow partition, InternalRow data) throws Exception {
+ FormatTableRecordWriter writer = writers.get(partition);
+ if (writer == null) {
+ writer = createWriter(partition.copy());
+ writers.put(partition.copy(), writer);
+ }
+ writer.write(data);
+ }
+
+ public void close() throws Exception {
+ writers.clear();
+ }
+
+ public List<CommitMessage> prepareCommit() throws Exception {
+ List<CommitMessage> commitMessages = new ArrayList<>();
+ for (FormatTableRecordWriter writer : writers.values()) {
+ List<TwoPhaseOutputStream.Committer> commiters =
writer.closeAndGetCommitters();
+ for (TwoPhaseOutputStream.Committer committer : commiters) {
+ TwoPhaseCommitMessage twoPhaseCommitMessage = new
TwoPhaseCommitMessage(committer);
+ commitMessages.add(twoPhaseCommitMessage);
+ }
+ }
+ return commitMessages;
+ }
+
+ private FormatTableRecordWriter createWriter(BinaryRow partition) {
+ RowType writeRowType =
+ rowType.project(
+ rowType.getFieldNames().stream()
+ .filter(name ->
!partitionType.getFieldNames().contains(name))
+ .collect(Collectors.toList()));
+ return new FormatTableRecordWriter(
+ fileIO,
+ fileFormat,
+ options.targetFileSize(false),
+ pathFactory.createFormatTableDataFilePathFactory(
+ partition,
options.formatTablePartitionOnlyValueInPath()),
+ writeRowType,
+ options.fileCompression());
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRecordWriter.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRecordWriter.java
new file mode 100644
index 0000000000..b611f2805d
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableRecordWriter.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.format.FileFormat;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.io.DataFilePathFactory;
+import org.apache.paimon.io.FormatTableRollingFileWriter;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** record writer for format table. */
+public class FormatTableRecordWriter implements AutoCloseable {
+
+ private final FileIO fileIO;
+ private final DataFilePathFactory pathFactory;
+ private final RowType writeSchema;
+ private final String fileCompression;
+ private final FileFormat fileFormat;
+ private final long targetFileSize;
+ private FormatTableRollingFileWriter writer;
+
+ public FormatTableRecordWriter(
+ FileIO fileIO,
+ FileFormat fileFormat,
+ long targetFileSize,
+ DataFilePathFactory pathFactory,
+ RowType writeSchema,
+ String fileCompression) {
+ this.fileIO = fileIO;
+ this.pathFactory = pathFactory;
+ this.fileCompression = fileCompression;
+ this.writeSchema = writeSchema;
+ this.fileFormat = fileFormat;
+ this.targetFileSize = targetFileSize;
+ }
+
+ public void write(InternalRow data) throws Exception {
+ if (writer == null) {
+ writer = createRollingRowWriter();
+ }
+ writer.write(data);
+ }
+
+ public List<TwoPhaseOutputStream.Committer> closeAndGetCommitters() throws
Exception {
+ List<TwoPhaseOutputStream.Committer> commits = new ArrayList<>();
+ if (writer != null) {
+ writer.close();
+ commits.addAll(writer.committers());
+ writer = null;
+ }
+ return commits;
+ }
+
+ @Override
+ public void close() throws Exception {
+ if (writer != null) {
+ writer.abort();
+ writer = null;
+ }
+ }
+
+ private FormatTableRollingFileWriter createRollingRowWriter() {
+ return new FormatTableRollingFileWriter(
+ fileIO, fileFormat, targetFileSize, writeSchema, pathFactory,
fileCompression);
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
new file mode 100644
index 0000000000..ff88c52d66
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/FormatTableWrite.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.casting.DefaultValueRow;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.io.BundleRecords;
+import org.apache.paimon.memory.MemoryPoolFactory;
+import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.table.sink.BatchTableWrite;
+import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.FormatTableRowPartitionKeyExtractor;
+import org.apache.paimon.table.sink.TableWrite;
+import org.apache.paimon.types.DataField;
+import org.apache.paimon.types.RowType;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** {@link TableWrite} implementation for format table. */
+public class FormatTableWrite implements BatchTableWrite {
+
+ private RowType rowType;
+ private final FormatTableFileWriter write;
+ private final FormatTableRowPartitionKeyExtractor partitionKeyExtractor;
+
+ private final int[] notNullFieldIndex;
+ private final @Nullable DefaultValueRow defaultValueRow;
+
+ public FormatTableWrite(
+ FileIO fileIO,
+ RowType rowType,
+ CoreOptions options,
+ RowType partitionType,
+ List<String> partitionKeys) {
+ this.rowType = rowType;
+ this.write = new FormatTableFileWriter(fileIO, rowType, options,
partitionType);
+ this.partitionKeyExtractor =
+ new FormatTableRowPartitionKeyExtractor(rowType,
partitionKeys);
+ List<String> notNullColumnNames =
+ rowType.getFields().stream()
+ .filter(field -> !field.type().isNullable())
+ .map(DataField::name)
+ .collect(Collectors.toList());
+ this.notNullFieldIndex = rowType.getFieldIndices(notNullColumnNames);
+ this.defaultValueRow = DefaultValueRow.create(rowType);
+ }
+
+ @Override
+ public BatchTableWrite withWriteType(RowType writeType) {
+ write.withWriteType(writeType);
+ return this;
+ }
+
+ @Override
+ public BinaryRow getPartition(InternalRow row) {
+ return partitionKeyExtractor.partition(row);
+ }
+
+ @Override
+ public void write(InternalRow row) throws Exception {
+ // checkNullability
+ for (int idx : notNullFieldIndex) {
+ if (row.isNullAt(idx)) {
+ String columnName = rowType.getFields().get(idx).name();
+ throw new RuntimeException(
+ String.format("Cannot write null to non-null
column(%s)", columnName));
+ }
+ }
+ row = defaultValueRow == null ? row : defaultValueRow.replaceRow(row);
+ BinaryRow partition = partitionKeyExtractor.partition(row);
+ write.write(partition, row);
+ }
+
+ @Override
+ public List<CommitMessage> prepareCommit() throws Exception {
+ return write.prepareCommit();
+ }
+
+ public void commit(List<CommitMessage> commitMessages) throws Exception {
+ for (CommitMessage commitMessage : commitMessages) {
+ if (commitMessage instanceof TwoPhaseCommitMessage) {
+ TwoPhaseCommitMessage twoPhaseCommitMessage =
(TwoPhaseCommitMessage) commitMessage;
+ twoPhaseCommitMessage.getCommitter().commit();
+ } else {
+ throw new RuntimeException(
+ "Unsupported commit message type: " +
commitMessage.getClass().getName());
+ }
+ }
+ }
+
+ @Override
+ public void close() throws Exception {
+ write.close();
+ }
+
+ @Override
+ public int getBucket(InternalRow row) {
+ return 0;
+ }
+
+ @Override
+ public TableWrite withMemoryPoolFactory(MemoryPoolFactory
memoryPoolFactory) {
+ return this;
+ }
+
+ @Override
+ public TableWrite withIOManager(IOManager ioManager) {
+ return this;
+ }
+
+ @Override
+ public void write(InternalRow row, int bucket) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void writeBundle(BinaryRow partition, int bucket, BundleRecords
bundle)
+ throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void compact(BinaryRow partition, int bucket, boolean
fullCompaction) throws Exception {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public TableWrite withMetricRegistry(MetricRegistry registry) {
+ return this;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/format/TwoPhaseCommitMessage.java
b/paimon-core/src/main/java/org/apache/paimon/table/format/TwoPhaseCommitMessage.java
new file mode 100644
index 0000000000..cc43636d2f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/format/TwoPhaseCommitMessage.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.format;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.fs.TwoPhaseOutputStream;
+import org.apache.paimon.table.sink.CommitMessage;
+
+import javax.annotation.Nullable;
+
+/** {@link CommitMessage} implementation for format table. */
+public class TwoPhaseCommitMessage implements CommitMessage {
+ private TwoPhaseOutputStream.Committer committer;
+
+ public TwoPhaseCommitMessage(TwoPhaseOutputStream.Committer committer) {
+ this.committer = committer;
+ }
+
+ @Override
+ public BinaryRow partition() {
+ return null;
+ }
+
+ @Override
+ public int bucket() {
+ return 0;
+ }
+
+ @Override
+ public @Nullable Integer totalBuckets() {
+ return 0;
+ }
+
+ public TwoPhaseOutputStream.Committer getCommitter() {
+ return committer;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/sink/FormatTableRowPartitionKeyExtractor.java
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FormatTableRowPartitionKeyExtractor.java
new file mode 100644
index 0000000000..45a4198a3f
--- /dev/null
+++
b/paimon-core/src/main/java/org/apache/paimon/table/sink/FormatTableRowPartitionKeyExtractor.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table.sink;
+
+import org.apache.paimon.codegen.CodeGenUtils;
+import org.apache.paimon.codegen.Projection;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import java.util.List;
+
+/** A {@link PartitionKeyExtractor} to {@link InternalRow} for format table. */
+public class FormatTableRowPartitionKeyExtractor implements
PartitionKeyExtractor<InternalRow> {
+
+ private final Projection partitionProjection;
+
+ public FormatTableRowPartitionKeyExtractor(RowType rowType, List<String>
partitionKeys) {
+ partitionProjection = CodeGenUtils.newProjection(rowType,
partitionKeys);
+ }
+
+ @Override
+ public BinaryRow partition(InternalRow record) {
+ return partitionProjection.apply(record);
+ }
+
+ @Override
+ public BinaryRow trimmedPrimaryKey(InternalRow record) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
index cb718fc73a..6deb55ef0b 100644
---
a/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/utils/FileStorePathFactory.java
@@ -173,9 +173,10 @@ public class FileStorePathFactory {
createExternalPathProvider(partition, bucket));
}
- public DataFilePathFactory createFormatTableDataFilePathFactory(BinaryRow
partition) {
+ public DataFilePathFactory createFormatTableDataFilePathFactory(
+ BinaryRow partition, boolean onlyValue) {
return new DataFilePathFactory(
- partitionPath(partition),
+ partitionPath(partition, onlyValue),
formatIdentifier,
dataFilePrefix,
changelogFilePrefix,
@@ -184,6 +185,15 @@ public class FileStorePathFactory {
createExternalPartitionPathProvider(partition));
}
+ private ExternalPathProvider createExternalPartitionPathProvider(
+ BinaryRow partition, boolean onlyValue) {
+ if (externalPaths == null || externalPaths.isEmpty()) {
+ return null;
+ }
+
+ return new ExternalPathProvider(externalPaths,
partitionPath(partition, onlyValue));
+ }
+
private ExternalPathProvider createExternalPartitionPathProvider(BinaryRow
partition) {
if (externalPaths == null || externalPaths.isEmpty()) {
return null;
@@ -192,9 +202,9 @@ public class FileStorePathFactory {
return new ExternalPathProvider(externalPaths,
partitionPath(partition));
}
- public Path partitionPath(BinaryRow partition) {
+ private Path partitionPath(BinaryRow partition, boolean onlyValue) {
Path relativeBucketPath = null;
- String partitionPath = getPartitionString(partition);
+ String partitionPath = getPartitionString(partition, onlyValue);
if (!partitionPath.isEmpty()) {
relativeBucketPath = new Path(partitionPath);
}
@@ -207,6 +217,10 @@ public class FileStorePathFactory {
return relativeBucketPath != null ? new Path(root, relativeBucketPath)
: root;
}
+ public Path partitionPath(BinaryRow partition) {
+ return partitionPath(partition, false);
+ }
+
@Nullable
private ExternalPathProvider createExternalPathProvider(BinaryRow
partition, int bucket) {
if (externalPaths == null || externalPaths.isEmpty()) {
@@ -248,6 +262,14 @@ public class FileStorePathFactory {
partition, "Partition row data is null. This
is unexpected.")));
}
+ public String getPartitionString(BinaryRow partition, boolean onlyValue) {
+ return PartitionPathUtils.generatePartitionPathUtil(
+ partitionComputer.generatePartValues(
+ Preconditions.checkNotNull(
+ partition, "Partition row data is null. This
is unexpected.")),
+ onlyValue);
+ }
+
// @TODO, need to be changed
public List<Path> getHierarchicalPartitionPath(BinaryRow partition) {
return PartitionPathUtils.generateHierarchicalPartitionPaths(
diff --git
a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
index 9c9e46e98e..c8a540319d 100644
--- a/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
+++ b/paimon-core/src/test/java/org/apache/paimon/catalog/CatalogTestBase.java
@@ -25,19 +25,9 @@ import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.format.FileFormatFactory;
-import org.apache.paimon.format.FormatWriter;
-import org.apache.paimon.format.FormatWriterFactory;
import org.apache.paimon.format.HadoopCompressionType;
-import org.apache.paimon.format.SupportsDirectWrite;
-import org.apache.paimon.format.csv.CsvFileFormatFactory;
-import org.apache.paimon.format.json.JsonFileFormatFactory;
-import org.apache.paimon.format.parquet.ParquetFileFormatFactory;
import org.apache.paimon.fs.FileIO;
-import org.apache.paimon.fs.Path;
-import org.apache.paimon.fs.PositionOutputStream;
import org.apache.paimon.fs.ResolvingFileIO;
-import org.apache.paimon.io.DataFilePathFactory;
import org.apache.paimon.options.CatalogOptions;
import org.apache.paimon.options.ConfigOption;
import org.apache.paimon.options.Options;
@@ -51,9 +41,11 @@ import org.apache.paimon.schema.TableSchema;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.FormatTable;
import org.apache.paimon.table.Table;
+import org.apache.paimon.table.format.FormatTableWrite;
import org.apache.paimon.table.sink.BatchTableCommit;
import org.apache.paimon.table.sink.BatchTableWrite;
import org.apache.paimon.table.sink.BatchWriteBuilder;
+import org.apache.paimon.table.sink.CommitMessage;
import org.apache.paimon.table.source.ReadBuilder;
import org.apache.paimon.table.source.TableScan;
import org.apache.paimon.table.system.AllTableOptionsTable;
@@ -79,7 +71,6 @@ import org.junit.jupiter.params.provider.ValueSource;
import javax.annotation.Nullable;
-import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -620,27 +611,7 @@ public abstract class CatalogTestBase {
dtPartitionValue,
BinaryString.fromString(dt2PartitionValue));
}
- FormatWriterFactory factory =
- (buildFileFormatFactory(format)
- .create(
- new
FileFormatFactory.FormatContext(
- new
Options(table.options()), 1024, 1024)))
-
.createWriterFactory(getFormatTableWriteRowType(table));
- Path partitionPath =
- new Path(
- String.format(
- "%s/%s/%s",
- table.location(), dtPartitionValue,
dt2PartitionValue));
- DataFilePathFactory dataFilePathFactory =
- new DataFilePathFactory(
- partitionPath,
- format,
- "data",
- "change",
- true,
- compressionType.value(),
- null);
- write(factory, dataFilePathFactory.newPath(),
compressionType.value(), datas);
+ writeAndCheckCommitFormatTable(table, datas, null);
List<InternalRow> readAllData = read(table, null, null, null,
null);
assertThat(readAllData).containsExactlyInAnyOrder(datas);
Map<String, String> partitionSpec = new HashMap<>();
@@ -654,7 +625,7 @@ public abstract class CatalogTestBase {
@ParameterizedTest
@ValueSource(booleans = {true, false})
- public void testFormatTableRead(boolean partitioned) throws Exception {
+ public void testFormatTableReadAndWrite(boolean partitioned) throws
Exception {
if (!supportsFormatTable()) {
return;
}
@@ -682,7 +653,7 @@ public abstract class CatalogTestBase {
int[] projection = new int[] {1, 2};
PredicateBuilder builder = new
PredicateBuilder(table.rowType().project(projection));
Predicate predicate = builder.greaterOrEqual(0, 10);
- int size = 5;
+ int size = 2000;
int checkSize = 3;
InternalRow[] datas = new InternalRow[size];
InternalRow[] checkDatas = new InternalRow[checkSize];
@@ -696,59 +667,21 @@ public abstract class CatalogTestBase {
}
InternalRow dataWithDiffPartition =
GenericRow.of(random.nextInt(), random.nextInt(), 11);
- FormatWriterFactory factory =
- (buildFileFormatFactory(format)
- .create(
- new
FileFormatFactory.FormatContext(
- new
Options(table.options()), 1024, 1024)))
-
.createWriterFactory(getFormatTableWriteRowType(table));
Map<String, String> partitionSpec = null;
+ int dataSize = size;
if (partitioned) {
- Path partitionPath =
- new Path(String.format("%s/%s", table.location(),
"dt=" + partitionValue));
- DataFilePathFactory dataFilePathFactory =
- new DataFilePathFactory(
- partitionPath,
- format,
- "data",
- "change",
- true,
- compressionType.value(),
- null);
- Path diffPartitionPath =
- new Path(String.format("%s/%s", table.location(),
"dt=" + 11));
- DataFilePathFactory diffPartitionPathFactory =
- new DataFilePathFactory(
- diffPartitionPath,
- format,
- "data",
- "change",
- true,
- compressionType.value(),
- null);
- write(factory, dataFilePathFactory.newPath(),
compressionType.value(), datas);
- write(
- factory,
- diffPartitionPathFactory.newPath(),
- compressionType.value(),
- dataWithDiffPartition);
+ writeAndCheckCommitFormatTable(table, datas,
dataWithDiffPartition);
+ dataSize = size + 1;
partitionSpec = new HashMap<>();
partitionSpec.put("dt", "" + partitionValue);
} else {
- DataFilePathFactory dataFilePathFactory =
- new DataFilePathFactory(
- new Path(table.location()),
- format,
- "data",
- "change",
- true,
- compressionType.value(),
- null);
- write(factory, dataFilePathFactory.newPath(),
compressionType.value(), datas);
+ writeAndCheckCommitFormatTable(table, datas, null);
}
List<InternalRow> readFilterData =
read(table, predicate, projection, partitionSpec, null);
assertThat(readFilterData).containsExactlyInAnyOrder(checkDatas);
+ List<InternalRow> readallData = read(table, null, null, null,
null);
+ assertThat(readallData).hasSize(dataSize);
int limit = checkSize - 1;
List<InternalRow> readLimitData =
read(table, predicate, projection, partitionSpec, limit);
@@ -767,44 +700,19 @@ public abstract class CatalogTestBase {
}
}
- protected RowType getFormatTableWriteRowType(Table table) {
- return table.rowType()
- .project(
- table.rowType().getFieldNames().stream()
- .filter(name ->
!table.partitionKeys().contains(name))
- .collect(Collectors.toList()));
- }
-
- protected FileFormatFactory buildFileFormatFactory(String format) {
- switch (format) {
- case "csv":
- return new CsvFileFormatFactory();
- case "parquet":
- return new ParquetFileFormatFactory();
- case "json":
- return new JsonFileFormatFactory();
- default:
- throw new IllegalArgumentException("Unsupported format: " +
format);
- }
- }
-
- protected void write(
- FormatWriterFactory factory, Path file, String compression,
InternalRow... rows)
- throws IOException {
- FormatWriter writer;
- PositionOutputStream out = null;
- if (factory instanceof SupportsDirectWrite) {
- writer = ((SupportsDirectWrite) factory).create(fileIO, file,
compression);
- } else {
- out = fileIO.newOutputStream(file, true);
- writer = factory.create(out, compression);
- }
- for (InternalRow row : rows) {
- writer.addElement(row);
- }
- writer.close();
- if (out != null) {
- out.close();
+ private void writeAndCheckCommitFormatTable(
+ Table table, InternalRow[] datas, InternalRow
dataWithDiffPartition) throws Exception {
+ try (FormatTableWrite write = (FormatTableWrite)
table.newBatchWriteBuilder().newWrite()) {
+ for (InternalRow row : datas) {
+ write.write(row);
+ }
+ if (dataWithDiffPartition != null) {
+ write.write(dataWithDiffPartition);
+ }
+ List<CommitMessage> committers = write.prepareCommit();
+ List<InternalRow> readData = read(table, null, null, null, null);
+ assertThat(readData).isEmpty();
+ write.commit(committers);
}
}