This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 116789cbf [core] Improve RocksDBStateFactory bulkLoad exception (#2356)
116789cbf is described below

commit 116789cbf586a6a2f20a668acc46f2a436bb3d7e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 21 16:46:45 2023 +0800

    [core] Improve RocksDBStateFactory bulkLoad exception (#2356)
---
 .../apache/paimon/lookup/RocksDBStateFactory.java  | 63 +++++++++++-----------
 1 file changed, 33 insertions(+), 30 deletions(-)

diff --git 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java 
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
index 1bc1edbcd..5ebdd2f92 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
@@ -82,42 +82,45 @@ public class RocksDBStateFactory implements Closeable {
     }
 
     public void bulkLoad(RocksDBState<?, ?, ?> state, KeyValueIterator<byte[], 
byte[]> iterator)
-            throws IOException {
-        try {
-            long targetFileSize = options.targetFileSizeBase();
-
-            List<String> files = new ArrayList<>();
-            SstFileWriter writer = null;
-            long recordNum = 0;
-            while (iterator.advanceNext()) {
-                byte[] key = iterator.getKey();
-                byte[] value = iterator.getValue();
-
-                if (writer == null) {
-                    writer = new SstFileWriter(new EnvOptions(), options);
-                    String path = new File(this.path, "sst-" + 
(sstIndex++)).getPath();
-                    writer.open(path);
-                    files.add(path);
-                }
+            throws IOException, RocksDBException {
+        long targetFileSize = options.targetFileSizeBase();
+
+        List<String> files = new ArrayList<>();
+        SstFileWriter writer = null;
+        long recordNum = 0;
+        while (iterator.advanceNext()) {
+            byte[] key = iterator.getKey();
+            byte[] value = iterator.getValue();
+
+            if (writer == null) {
+                writer = new SstFileWriter(new EnvOptions(), options);
+                String path = new File(this.path, "sst-" + 
(sstIndex++)).getPath();
+                writer.open(path);
+                files.add(path);
+            }
 
+            try {
                 writer.put(key, value);
-                recordNum++;
-                if (recordNum % 1000 == 0 && writer.fileSize() >= 
targetFileSize) {
-                    writer.finish();
-                    writer = null;
-                    recordNum = 0;
-                }
+            } catch (RocksDBException e) {
+                throw new RuntimeException(
+                        "Exception in bulkLoad, the most suspicious reason is 
that "
+                                + "your data contains duplicates, please check 
your sink table.",
+                        e);
             }
-
-            if (writer != null) {
+            recordNum++;
+            if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
                 writer.finish();
+                writer = null;
+                recordNum = 0;
             }
+        }
 
-            if (files.size() > 0) {
-                db.ingestExternalFile(state.columnFamily, files, new 
IngestExternalFileOptions());
-            }
-        } catch (Exception e) {
-            throw new IOException(e);
+        if (writer != null) {
+            writer.finish();
+        }
+
+        if (files.size() > 0) {
+            db.ingestExternalFile(state.columnFamily, files, new 
IngestExternalFileOptions());
         }
     }
 

Reply via email to