This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 5564403d0 [core] Changelog expire should check index file existence 
(#4186)
5564403d0 is described below

commit 5564403d0f1216e2e66aaa6cf908bae965963e0d
Author: YeJunHao <41894543+leaves12...@users.noreply.github.com>
AuthorDate: Fri Sep 13 19:05:19 2024 +0800

    [core] Changelog expire should check index file existence (#4186)
---
 .../java/org/apache/paimon/consumer/Consumer.java  |  4 +-
 .../apache/paimon/operation/ChangelogDeletion.java |  2 +-
 .../apache/paimon/table/ChangelogExpireTest.java   | 81 ++++++++++++++++++++++
 .../paimon/table/IndexFileExpireTableTest.java     |  2 +-
 4 files changed, 85 insertions(+), 4 deletions(-)

diff --git a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java 
b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
index bad0e2509..0925d4f8c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
+++ b/paimon-core/src/main/java/org/apache/paimon/consumer/Consumer.java
@@ -61,14 +61,14 @@ public class Consumer {
     public static Optional<Consumer> fromPath(FileIO fileIO, Path path) {
         int retryNumber = 0;
         MismatchedInputException exception = null;
-        while (retryNumber++ < 5) {
+        while (retryNumber++ < 10) {
             try {
                 return 
fileIO.readOverwrittenFileUtf8(path).map(Consumer::fromJson);
             } catch (MismatchedInputException e) {
                 // retry
                 exception = e;
                 try {
-                    Thread.sleep(100);
+                    Thread.sleep(1_000);
                 } catch (InterruptedException ie) {
                     Thread.currentThread().interrupt();
                     throw new RuntimeException(ie);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
index a73c0a078..c20405ff2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/ChangelogDeletion.java
@@ -109,7 +109,7 @@ public class ChangelogDeletion extends 
FileDeletionBase<Changelog> {
 
             // index manifests
             String indexManifest = skippingSnapshot.indexManifest();
-            if (indexManifest != null) {
+            if (indexManifest != null && 
indexFileHandler.existsManifest(indexManifest)) {
                 skippingSet.add(indexManifest);
                 indexFileHandler.readManifest(indexManifest).stream()
                         .map(IndexManifestEntry::indexFile)
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
new file mode 100644
index 000000000..db6ee7496
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/table/ChangelogExpireTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.fs.Path;
+import org.apache.paimon.options.ExpireConfig;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.utils.TraceableFileIO;
+
+import org.assertj.core.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+import java.util.UUID;
+
+import static org.apache.paimon.options.CatalogOptions.CACHE_ENABLED;
+
+/** Test for changelog expire. */
+public class ChangelogExpireTest extends IndexFileExpireTableTest {
+
+    @BeforeEach
+    public void beforeEachBase() throws Exception {
+        CatalogContext context =
+                CatalogContext.create(
+                        new Path(TraceableFileIO.SCHEME + "://" + 
tempPath.toString()));
+        context.options().set(CACHE_ENABLED.key(), "false");
+        Catalog catalog = CatalogFactory.createCatalog(context);
+        Identifier identifier = new Identifier("default", "T");
+        catalog.createDatabase(identifier.getDatabaseName(), true);
+        Schema schema =
+                Schema.newBuilder()
+                        .column("pt", DataTypes.INT())
+                        .column("pk", DataTypes.INT())
+                        .column("col1", DataTypes.INT())
+                        .partitionKeys("pt")
+                        .primaryKey("pk", "pt")
+                        .option("changelog-producer", "input")
+                        .option("changelog.num-retained.max", "40")
+                        .option("snapshot.num-retained.max", "39")
+                        .options(tableOptions().toMap())
+                        .build();
+        catalog.createTable(identifier, schema, true);
+        table = (FileStoreTable) catalog.getTable(identifier);
+        commitUser = UUID.randomUUID().toString();
+    }
+
+    @Test
+    public void testChangelogExpire() throws Exception {
+        ExpireConfig expireConfig =
+                
ExpireConfig.builder().changelogRetainMax(40).snapshotRetainMax(39).build();
+        prepareExpireTable();
+        ExpireChangelogImpl expire =
+                (ExpireChangelogImpl) 
table.newExpireChangelog().config(expireConfig);
+
+        ExpireSnapshotsImpl expireSnapshots =
+                (ExpireSnapshotsImpl) 
table.newExpireSnapshots().config(expireConfig);
+        expireSnapshots.expireUntil(1, 7);
+        Assertions.assertThatCode(() -> expire.expireUntil(1, 
6)).doesNotThrowAnyException();
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
index b7117cf0a..4ad634a43 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/IndexFileExpireTableTest.java
@@ -190,7 +190,7 @@ public class IndexFileExpireTableTest extends 
PrimaryKeyTableTestBase {
         assertThat(indexManifestSize()).isEqualTo(1);
     }
 
-    private void prepareExpireTable() throws Exception {
+    protected void prepareExpireTable() throws Exception {
         StreamWriteBuilder writeBuilder = table.newStreamWriteBuilder();
         StreamTableWrite write = writeBuilder.newWrite();
         StreamTableCommit commit = writeBuilder.newCommit();

Reply via email to