This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 116789cbf [core] Improve RocksDBStateFactory bulkLoad exception (#2356)
116789cbf is described below
commit 116789cbf586a6a2f20a668acc46f2a436bb3d7e
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Nov 21 16:46:45 2023 +0800
[core] Improve RocksDBStateFactory bulkLoad exception (#2356)
---
.../apache/paimon/lookup/RocksDBStateFactory.java | 63 +++++++++++-----------
1 file changed, 33 insertions(+), 30 deletions(-)
diff --git
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
index 1bc1edbcd..5ebdd2f92 100644
---
a/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
+++
b/paimon-core/src/main/java/org/apache/paimon/lookup/RocksDBStateFactory.java
@@ -82,42 +82,45 @@ public class RocksDBStateFactory implements Closeable {
}
public void bulkLoad(RocksDBState<?, ?, ?> state, KeyValueIterator<byte[],
byte[]> iterator)
- throws IOException {
- try {
- long targetFileSize = options.targetFileSizeBase();
-
- List<String> files = new ArrayList<>();
- SstFileWriter writer = null;
- long recordNum = 0;
- while (iterator.advanceNext()) {
- byte[] key = iterator.getKey();
- byte[] value = iterator.getValue();
-
- if (writer == null) {
- writer = new SstFileWriter(new EnvOptions(), options);
- String path = new File(this.path, "sst-" +
(sstIndex++)).getPath();
- writer.open(path);
- files.add(path);
- }
+ throws IOException, RocksDBException {
+ long targetFileSize = options.targetFileSizeBase();
+
+ List<String> files = new ArrayList<>();
+ SstFileWriter writer = null;
+ long recordNum = 0;
+ while (iterator.advanceNext()) {
+ byte[] key = iterator.getKey();
+ byte[] value = iterator.getValue();
+
+ if (writer == null) {
+ writer = new SstFileWriter(new EnvOptions(), options);
+ String path = new File(this.path, "sst-" +
(sstIndex++)).getPath();
+ writer.open(path);
+ files.add(path);
+ }
+ try {
writer.put(key, value);
- recordNum++;
- if (recordNum % 1000 == 0 && writer.fileSize() >=
targetFileSize) {
- writer.finish();
- writer = null;
- recordNum = 0;
- }
+ } catch (RocksDBException e) {
+ throw new RuntimeException(
+ "Exception in bulkLoad, the most suspicious reason is
that "
+ + "your data contains duplicates, please check
your sink table.",
+ e);
}
-
- if (writer != null) {
+ recordNum++;
+ if (recordNum % 1000 == 0 && writer.fileSize() >= targetFileSize) {
writer.finish();
+ writer = null;
+ recordNum = 0;
}
+ }
- if (files.size() > 0) {
- db.ingestExternalFile(state.columnFamily, files, new
IngestExternalFileOptions());
- }
- } catch (Exception e) {
- throw new IOException(e);
+ if (writer != null) {
+ writer.finish();
+ }
+
+ if (files.size() > 0) {
+ db.ingestExternalFile(state.columnFamily, files, new
IngestExternalFileOptions());
}
}