aokolnychyi commented on a change in pull request #1776:
URL: https://github.com/apache/iceberg/pull/1776#discussion_r524817542
##########
File path: spark3/src/main/java/org/apache/iceberg/spark/source/SparkWrite.java
##########
@@ -236,9 +201,171 @@ protected Table table() {
return ImmutableList.of();
}
- @Override
- public String toString() {
- return String.format("IcebergWrite(table=%s, format=%s)", table, format);
+ private abstract class BaseBatchWrite implements BatchWrite {
+ @Override
+ public DataWriterFactory createBatchWriterFactory(PhysicalWriteInfo info) {
+ return createWriterFactory();
+ }
+
+ @Override
+ public void abort(WriterCommitMessage[] messages) {
+ SparkWrite.this.abort(messages);
+ }
+
+ @Override
+ public String toString() {
+ return String.format("IcebergBatchWrite(table=%s, format=%s)", table,
format);
+ }
+ }
+
+ private class BatchAppend extends BaseBatchWrite {
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ AppendFiles append = table.newAppend();
+
+ int numFiles = 0;
+ for (DataFile file : files(messages)) {
+ numFiles += 1;
+ append.appendFile(file);
+ }
+
+ commitOperation(append, String.format("append with %d new data files",
numFiles));
+ }
+ }
+
+ private class DynamicOverwrite extends BaseBatchWrite {
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ ReplacePartitions dynamicOverwrite = table.newReplacePartitions();
+
+ int numFiles = 0;
+ for (DataFile file : files(messages)) {
+ numFiles += 1;
+ dynamicOverwrite.addFile(file);
+ }
+
+ commitOperation(dynamicOverwrite, String.format("dynamic partition
overwrite with %d new data files", numFiles));
+ }
+ }
+
+ private class OverwriteByFilter extends BaseBatchWrite {
+ private final Expression overwriteExpr;
+
+ private OverwriteByFilter(Expression overwriteExpr) {
+ this.overwriteExpr = overwriteExpr;
+ }
+
+ @Override
+ public void commit(WriterCommitMessage[] messages) {
+ OverwriteFiles overwriteFiles = table.newOverwrite();
+ overwriteFiles.overwriteByRowFilter(overwriteExpr);
+
+ int numFiles = 0;
+ for (DataFile file : files(messages)) {
+ numFiles += 1;
+ overwriteFiles.addFile(file);
+ }
+
+ String commitMsg = String.format("overwrite by filter %s with %d new
data files", overwriteExpr, numFiles);
+ commitOperation(overwriteFiles, commitMsg);
+ }
+ }
+
+ private abstract class BaseStreamingWrite implements StreamingWrite {
+ private static final String QUERY_ID_PROPERTY =
"spark.sql.streaming.queryId";
+ private static final String EPOCH_ID_PROPERTY =
"spark.sql.streaming.epochId";
+
+ protected abstract String mode();
+
+ @Override
+ public StreamingDataWriterFactory
createStreamingWriterFactory(PhysicalWriteInfo info) {
+ return createWriterFactory();
+ }
+
+ @Override
+ public final void commit(long epochId, WriterCommitMessage[] messages) {
+ LOG.info("Committing epoch {} for query {} in {} mode", epochId,
queryId, mode());
+
+ table.refresh();
+ Long lastCommittedEpochId = getLastCommittedEpochId();
+ if (lastCommittedEpochId != null && epochId <= lastCommittedEpochId) {
+ LOG.info("Skipping epoch {} for query {} as it was already committed",
epochId, queryId);
+ return;
+ }
+
+ doCommit(epochId, messages);
+ }
+
+ protected abstract void doCommit(long epochId, WriterCommitMessage[]
messages);
+
+ protected <T> void commit(SnapshotUpdate<T> snapshotUpdate, long epochId,
String description) {
+ snapshotUpdate.set(QUERY_ID_PROPERTY, queryId);
+ snapshotUpdate.set(EPOCH_ID_PROPERTY, Long.toString(epochId));
+ commitOperation(snapshotUpdate, description);
+ }
+
+ private Long getLastCommittedEpochId() {
Review comment:
I kept the old name but I agree `find` is better. Let me update that.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]