This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new ab14a13c3 [core] Avoid 'trying to delete file' commit exception for 
expired partitions (#3945)
ab14a13c3 is described below

commit ab14a13c34db0d9eb6320391d041c1ef30c0d09a
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Aug 13 15:55:47 2024 +0800

    [core] Avoid 'trying to delete file' commit exception for expired 
partitions (#3945)
---
 .../paimon/utils/InternalRowPartitionComputer.java |  2 +-
 .../utils/InternalRowPartitionComputerTest.java    |  6 +-
 .../java/org/apache/paimon/manifest/FileEntry.java |  9 ---
 .../org/apache/paimon/mergetree/LookupFile.java    |  4 +-
 .../apache/paimon/operation/FileStoreCommit.java   |  2 +
 .../paimon/operation/FileStoreCommitImpl.java      | 92 ++++++++++++++++------
 .../apache/paimon/operation/PartitionExpire.java   | 19 +++++
 .../PartitionValuesTimeExpireStrategy.java         |  4 +
 .../apache/paimon/table/sink/TableCommitImpl.java  |  1 +
 .../paimon/operation/PartitionExpireTest.java      | 73 ++++++++++++++---
 10 files changed, 161 insertions(+), 51 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
index 3211a2e32..881f0f4d1 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/InternalRowPartitionComputer.java
@@ -79,7 +79,7 @@ public class InternalRowPartitionComputer {
         return partValues;
     }
 
-    public static String toSimpleString(
+    public static String partToSimpleString(
             RowType partitionType, BinaryRow partition, String delimiter, int 
maxLength) {
         InternalRow.FieldGetter[] getters = partitionType.fieldGetters();
         StringBuilder builder = new StringBuilder();
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
index 5f57dd6cf..771136ce9 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/InternalRowPartitionComputerTest.java
@@ -35,7 +35,7 @@ public class InternalRowPartitionComputerTest {
     public void testPartitionToString() {
         RowType rowType = RowType.of();
         BinaryRow binaryRow = new BinaryRow(0);
-        assertThat(InternalRowPartitionComputer.toSimpleString(rowType, 
binaryRow, "-", 30))
+        assertThat(InternalRowPartitionComputer.partToSimpleString(rowType, 
binaryRow, "-", 30))
                 .isEqualTo("");
 
         rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
@@ -43,7 +43,7 @@ public class InternalRowPartitionComputerTest {
         BinaryRowWriter writer = new BinaryRowWriter(binaryRow);
         writer.writeString(0, BinaryString.fromString("20240731"));
         writer.writeInt(1, 10);
-        assertThat(InternalRowPartitionComputer.toSimpleString(rowType, 
binaryRow, "-", 30))
+        assertThat(InternalRowPartitionComputer.partToSimpleString(rowType, 
binaryRow, "-", 30))
                 .isEqualTo("20240731-10");
 
         rowType = RowType.of(DataTypes.STRING(), DataTypes.INT());
@@ -51,7 +51,7 @@ public class InternalRowPartitionComputerTest {
         writer = new BinaryRowWriter(binaryRow);
         writer.setNullAt(0);
         writer.writeInt(1, 10);
-        assertThat(InternalRowPartitionComputer.toSimpleString(rowType, 
binaryRow, "-", 30))
+        assertThat(InternalRowPartitionComputer.partToSimpleString(rowType, 
binaryRow, "-", 30))
                 .isEqualTo("null-10");
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
index 3efc4ea19..145eb93a7 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/FileEntry.java
@@ -161,13 +161,4 @@ public interface FileEntry {
                 manifestFiles,
                 manifestReadParallelism);
     }
-
-    static <T extends FileEntry> void assertNoDelete(Collection<T> entries) {
-        for (T entry : entries) {
-            Preconditions.checkState(
-                    entry.kind() != FileKind.DELETE,
-                    "Trying to delete file %s which is not previously added.",
-                    entry.fileName());
-        }
-    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java 
b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
index 097b18655..7469684da 100644
--- a/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/mergetree/LookupFile.java
@@ -40,7 +40,7 @@ import java.io.UncheckedIOException;
 import java.time.Duration;
 
 import static org.apache.paimon.mergetree.LookupUtils.fileKibiBytes;
-import static 
org.apache.paimon.utils.InternalRowPartitionComputer.toSimpleString;
+import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Lookup file for cache remote file to local. */
@@ -130,7 +130,7 @@ public class LookupFile {
         if (partition.getFieldCount() == 0) {
             return String.format("%s-%s", bucket, remoteFileName);
         } else {
-            String partitionString = toSimpleString(partitionType, partition, 
"-", 20);
+            String partitionString = partToSimpleString(partitionType, 
partition, "-", 20);
             return String.format("%s-%s-%s", partitionString, bucket, 
remoteFileName);
         }
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
index a63e2b733..e15225793 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommit.java
@@ -37,6 +37,8 @@ public interface FileStoreCommit extends AutoCloseable {
 
     FileStoreCommit ignoreEmptyCommit(boolean ignoreEmptyCommit);
 
+    FileStoreCommit withPartitionExpire(PartitionExpire partitionExpire);
+
     /** Find out which committables need to be retried when recovering from 
the failure. */
     List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
index 4d72efe9f..3a8d7195c 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreCommitImpl.java
@@ -80,6 +80,7 @@ import static 
org.apache.paimon.deletionvectors.DeletionVectorsIndexFile.DELETIO
 import static org.apache.paimon.index.HashIndexFile.HASH_INDEX;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.BranchManager.DEFAULT_MAIN_BRANCH;
+import static 
org.apache.paimon.utils.InternalRowPartitionComputer.partToSimpleString;
 
 /**
  * Default implementation of {@link FileStoreCommit}.
@@ -125,15 +126,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
     private final String branchName;
     @Nullable private final Integer manifestReadParallelism;
     private final List<CommitCallback> commitCallbacks;
+    private final StatsFileHandler statsFileHandler;
+    private final BucketMode bucketMode;
 
     @Nullable private Lock lock;
     private boolean ignoreEmptyCommit;
-
     private CommitMetrics commitMetrics;
-
-    private final StatsFileHandler statsFileHandler;
-
-    private final BucketMode bucketMode;
+    @Nullable private PartitionExpire partitionExpire;
 
     public FileStoreCommitImpl(
             FileIO fileIO,
@@ -198,6 +197,12 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         return this;
     }
 
+    @Override
+    public FileStoreCommit withPartitionExpire(PartitionExpire 
partitionExpire) {
+        this.partitionExpire = partitionExpire;
+        return this;
+    }
+
     @Override
     public List<ManifestCommittable> filterCommitted(List<ManifestCommittable> 
committables) {
         // nothing to filter, fast exit
@@ -1055,24 +1060,30 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         List<SimpleFileEntry> allEntries = new ArrayList<>(baseEntries);
         allEntries.addAll(changes);
 
-        Collection<SimpleFileEntry> mergedEntries;
+        java.util.function.Consumer<Throwable> conflictHandler =
+                e -> {
+                    Pair<RuntimeException, RuntimeException> conflictException 
=
+                            createConflictException(
+                                    "File deletion conflicts detected! Give up 
committing.",
+                                    baseCommitUser,
+                                    baseEntries,
+                                    changes,
+                                    e,
+                                    50);
+                    LOG.warn("", conflictException.getLeft());
+                    throw conflictException.getRight();
+                };
+
+        Collection<SimpleFileEntry> mergedEntries = null;
         try {
             // merge manifest entries and also check if the files we want to 
delete are still there
             mergedEntries = FileEntry.mergeEntries(allEntries);
-            FileEntry.assertNoDelete(mergedEntries);
         } catch (Throwable e) {
-            Pair<RuntimeException, RuntimeException> conflictException =
-                    createConflictException(
-                            "File deletion conflicts detected! Give up 
committing.",
-                            baseCommitUser,
-                            baseEntries,
-                            changes,
-                            e,
-                            50);
-            LOG.warn("", conflictException.getLeft());
-            throw conflictException.getRight();
+            conflictHandler.accept(e);
         }
 
+        assertNoDelete(mergedEntries, conflictHandler);
+
         // fast exit for file store without keys
         if (keyComparator == null) {
             return;
@@ -1116,6 +1127,43 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
         }
     }
 
+    private void assertNoDelete(
+            Collection<SimpleFileEntry> mergedEntries,
+            java.util.function.Consumer<Throwable> conflictHandler) {
+        try {
+            for (SimpleFileEntry entry : mergedEntries) {
+                Preconditions.checkState(
+                        entry.kind() != FileKind.DELETE,
+                        "Trying to delete file %s which is not previously 
added.",
+                        entry.fileName());
+            }
+        } catch (Throwable e) {
+            if (partitionExpire != null && 
partitionExpire.isValueExpiration()) {
+                Set<BinaryRow> deletedPartitions = new HashSet<>();
+                for (SimpleFileEntry entry : mergedEntries) {
+                    if (entry.kind() == FileKind.DELETE) {
+                        deletedPartitions.add(entry.partition());
+                    }
+                }
+                if (partitionExpire.isValueAllExpired(deletedPartitions)) {
+                    List<String> expiredPartitions =
+                            deletedPartitions.stream()
+                                    .map(
+                                            partition ->
+                                                    partToSimpleString(
+                                                            partitionType, 
partition, "-", 200))
+                                    .collect(Collectors.toList());
+                    throw new RuntimeException(
+                            "You are writing data to expired partitions, and 
you can filter this data to avoid job failover."
+                                    + " Otherwise, continuous expired records 
will cause the job to failover restart continuously."
+                                    + " Expired partitions are: "
+                                    + expiredPartitions);
+                }
+            }
+            conflictHandler.accept(e);
+        }
+    }
+
     /**
      * Construct detailed conflict exception. The returned exception is formed 
of (full exception,
      * simplified exception), The simplified exception is generated when the 
entry length is larger
@@ -1134,19 +1182,13 @@ public class FileStoreCommitImpl implements 
FileStoreCommit {
                         "Don't panic!",
                         "Conflicts during commits are normal and this failure 
is intended to resolve the conflicts.",
                         "Conflicts are mainly caused by the following 
scenarios:",
-                        "1. Your job is suffering from back-pressuring.",
-                        "   There are too many snapshots waiting to be 
committed "
-                                + "and an exception occurred during the commit 
procedure "
-                                + "(most probably due to checkpoint timeout).",
-                        "   See 
https://paimon.apache.org/docs/master/maintenance/write-performance/ "
-                                + "for how to improve writing performance.",
-                        "2. Multiple jobs are writing into the same partition 
at the same time, "
+                        "1. Multiple jobs are writing into the same partition 
at the same time, "
                                 + "or you use STATEMENT SET to execute 
multiple INSERT statements into the same Paimon table.",
                         "   You'll probably see different base commit user and 
current commit user below.",
                         "   You can use "
                                 + 
"https://paimon.apache.org/docs/master/maintenance/dedicated-compaction#dedicated-compaction-job";
                                 + " to support multiple writing.",
-                        "3. You're recovering from an old savepoint, or you're 
creating multiple jobs from a savepoint.",
+                        "2. You're recovering from an old savepoint, or you're 
creating multiple jobs from a savepoint.",
                         "   The job will fail continuously in this scenario to 
protect metadata from corruption.",
                         "   You can either recover from the latest savepoint, "
                                 + "or you can revert the table to the snapshot 
corresponding to the old savepoint.");
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
index 2f5ca780c..64074a4c1 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/PartitionExpire.java
@@ -19,9 +19,11 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metastore.MetastoreClient;
 import org.apache.paimon.partition.PartitionExpireStrategy;
+import org.apache.paimon.partition.PartitionValuesTimeExpireStrategy;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -31,6 +33,7 @@ import javax.annotation.Nullable;
 import java.time.Duration;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
@@ -86,6 +89,22 @@ public class PartitionExpire {
         return expire(LocalDateTime.now(), commitIdentifier);
     }
 
+    public boolean isValueExpiration() {
+        return strategy instanceof PartitionValuesTimeExpireStrategy;
+    }
+
+    public boolean isValueAllExpired(Collection<BinaryRow> partitions) {
+        PartitionValuesTimeExpireStrategy valuesStrategy =
+                (PartitionValuesTimeExpireStrategy) strategy;
+        LocalDateTime expireDateTime = 
LocalDateTime.now().minus(expirationTime);
+        for (BinaryRow partition : partitions) {
+            if (!valuesStrategy.isExpired(expireDateTime, partition)) {
+                return false;
+            }
+        }
+        return true;
+    }
+
     @VisibleForTesting
     void setLastCheck(LocalDateTime time) {
         lastCheck = time;
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
index 37cbed530..80ae633fd 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionValuesTimeExpireStrategy.java
@@ -61,6 +61,10 @@ public class PartitionValuesTimeExpireStrategy extends 
PartitionExpireStrategy {
                 .readPartitionEntries();
     }
 
+    public boolean isExpired(LocalDateTime expireDateTime, BinaryRow 
partition) {
+        return new 
PartitionValuesTimePredicate(expireDateTime).test(partition);
+    }
+
     /** The expired partition predicate uses the date-format value of the 
partition. */
     private class PartitionValuesTimePredicate implements PartitionPredicate {
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
index 5e25fcfd0..b4f8fa47d 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/sink/TableCommitImpl.java
@@ -106,6 +106,7 @@ public class TableCommitImpl implements InnerTableCommit {
         commit.withLock(lock);
         if (partitionExpire != null) {
             partitionExpire.withLock(lock);
+            commit.withPartitionExpire(partitionExpire);
         }
 
         this.commit = commit;
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
index e29bcd34a..931bac59c 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/PartitionExpireTest.java
@@ -24,11 +24,15 @@ import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
+import org.apache.paimon.io.CompactIncrement;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataIncrement;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.FileStoreTableFactory;
 import org.apache.paimon.table.sink.CommitMessage;
+import org.apache.paimon.table.sink.CommitMessageImpl;
 import org.apache.paimon.table.sink.StreamTableCommit;
 import org.apache.paimon.table.sink.StreamTableWrite;
 import org.apache.paimon.table.sink.TableCommitImpl;
@@ -55,6 +59,8 @@ import java.util.Map;
 import java.util.UUID;
 import java.util.concurrent.ThreadLocalRandom;
 
+import static java.util.Collections.emptyList;
+import static java.util.Collections.singletonList;
 import static 
org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL;
 import static org.apache.paimon.CoreOptions.PARTITION_EXPIRATION_TIME;
 import static org.apache.paimon.CoreOptions.PARTITION_TIMESTAMP_FORMATTER;
@@ -83,8 +89,8 @@ public class PartitionExpireTest {
                                 schemaManager.createTable(
                                         new Schema(
                                                 
RowType.of(VarCharType.STRING_TYPE).getFields(),
-                                                Collections.emptyList(),
-                                                Collections.emptyList(),
+                                                emptyList(),
+                                                emptyList(),
                                                 Collections.singletonMap(
                                                         
PARTITION_EXPIRATION_TIME.key(), "1 d"),
                                                 "")))
@@ -98,8 +104,8 @@ public class PartitionExpireTest {
         schemaManager.createTable(
                 new Schema(
                         RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
-                        Collections.singletonList("f0"),
-                        Collections.emptyList(),
+                        singletonList("f0"),
+                        emptyList(),
                         Collections.emptyMap(),
                         ""));
         table = FileStoreTableFactory.create(LocalFileIO.create(), path);
@@ -121,8 +127,8 @@ public class PartitionExpireTest {
         schemaManager.createTable(
                 new Schema(
                         RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
-                        Collections.singletonList("f0"),
-                        Collections.emptyList(),
+                        singletonList("f0"),
+                        emptyList(),
                         Collections.emptyMap(),
                         ""));
         table = FileStoreTableFactory.create(LocalFileIO.create(), path);
@@ -158,8 +164,8 @@ public class PartitionExpireTest {
         schemaManager.createTable(
                 new Schema(
                         RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
-                        Collections.singletonList("f0"),
-                        Collections.emptyList(),
+                        singletonList("f0"),
+                        emptyList(),
                         Collections.emptyMap(),
                         ""));
 
@@ -227,6 +233,44 @@ public class PartitionExpireTest {
                 .isEqualTo(allCommits.size() - 1);
     }
 
+    @Test
+    public void testDeleteExpiredPartition() throws Exception {
+        SchemaManager schemaManager = new SchemaManager(LocalFileIO.create(), 
path);
+        schemaManager.createTable(
+                new Schema(
+                        RowType.of(VarCharType.STRING_TYPE, 
VarCharType.STRING_TYPE).getFields(),
+                        singletonList("f0"),
+                        emptyList(),
+                        Collections.emptyMap(),
+                        ""));
+        table = FileStoreTableFactory.create(LocalFileIO.create(), path);
+        table = newExpireTable();
+
+        List<CommitMessage> commitMessages = write("20230101", "11");
+        write("20230105", "51");
+
+        PartitionExpire expire = newExpire();
+        expire.setLastCheck(date(1));
+        expire.expire(date(5), Long.MAX_VALUE);
+        assertThat(read()).containsExactlyInAnyOrder("20230105:51");
+
+        TableCommitImpl commit = table.newCommit("");
+        CommitMessageImpl message = (CommitMessageImpl) commitMessages.get(0);
+        DataFileMeta file = message.newFilesIncrement().newFiles().get(0);
+        CommitMessageImpl newMessage =
+                new CommitMessageImpl(
+                        message.partition(),
+                        message.bucket(),
+                        new DataIncrement(emptyList(), emptyList(), 
emptyList()),
+                        new CompactIncrement(singletonList(file), emptyList(), 
emptyList()));
+
+        assertThatThrownBy(() -> commit.commit(0L, singletonList(newMessage)))
+                .hasMessage(
+                        "You are writing data to expired partitions, and you 
can filter "
+                                + "this data to avoid job failover. Otherwise, 
continuous expired records will cause the"
+                                + " job to failover restart continuously. 
Expired partitions are: [20230101]");
+    }
+
     private List<String> read() throws IOException {
         List<String> ret = new ArrayList<>();
         table.newRead()
@@ -239,21 +283,28 @@ public class PartitionExpireTest {
         return LocalDateTime.of(LocalDate.of(2023, 1, day), LocalTime.MIN);
     }
 
-    private void write(String f0, String f1) throws Exception {
+    private List<CommitMessage> write(String f0, String f1) throws Exception {
         StreamTableWrite write =
                 table.copy(Collections.singletonMap(WRITE_ONLY.key(), 
"true")).newWrite("");
         write.write(GenericRow.of(BinaryString.fromString(f0), 
BinaryString.fromString(f1)));
         TableCommitImpl commit = table.newCommit("");
-        commit.commit(0, write.prepareCommit(true, 0));
+        List<CommitMessage> commitMessages = write.prepareCommit(true, 0);
+        commit.commit(0, commitMessages);
         write.close();
         commit.close();
+
+        return commitMessages;
     }
 
     private PartitionExpire newExpire() {
+        return newExpireTable().store().newPartitionExpire("");
+    }
+
+    private FileStoreTable newExpireTable() {
         Map<String, String> options = new HashMap<>();
         options.put(PARTITION_EXPIRATION_TIME.key(), "2 d");
         options.put(CoreOptions.PARTITION_EXPIRATION_CHECK_INTERVAL.key(), "1 
d");
         options.put(CoreOptions.PARTITION_TIMESTAMP_FORMATTER.key(), 
"yyyyMMdd");
-        return table.copy(options).store().newPartitionExpire("");
+        return table.copy(options);
     }
 }

Reply via email to