EMsnap commented on code in PR #7878: URL: https://github.com/apache/inlong/pull/7878#discussion_r1174498012
########## inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/collections/RocksDBDAO.java: ########## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink.collections; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.rocksdb.AbstractImmutableNativeReference; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Statistics; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterators; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Data access objects for storing and retrieving objects in Rocks DB. + */ +public class RocksDBDAO<K, V> { + + private static final Logger LOG = LogManager.getLogger(RocksDBDAO.class); + + private transient ConcurrentHashMap<String, ColumnFamilyHandle> managedHandlesMap; + private transient ConcurrentHashMap<String, ColumnFamilyDescriptor> managedDescriptorMap; + private transient RocksDB rocksDB; + private boolean closed = false; + private final String rocksDBBasePath; + private long totalBytesWritten; + private final TypeSerializer<K> keySerializer; + private final TypeSerializer<V> valueSerializer; + private final DataInputDeserializer inputBuffer; + private final DataOutputSerializer outputBuffer; + + public RocksDBDAO( + String basePath, + String rocksDBBasePath, + TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer) { + this.rocksDBBasePath = + String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID()); + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.inputBuffer = new DataInputDeserializer(); + this.outputBuffer = new DataOutputSerializer(4096); + init(); + totalBytesWritten = 0L; + } + + /** + * Create RocksDB if not initialized. + */ + private RocksDB getRocksDB() { + return rocksDB; + } + + /** + * Initialized Rocks DB instance. + */ + private void init() { + try { + LOG.info("DELETING RocksDB persisted at " + rocksDBBasePath); + FileIOUtils.deleteDirectory(new File(rocksDBBasePath)); + + managedHandlesMap = new ConcurrentHashMap<>(); + managedDescriptorMap = new ConcurrentHashMap<>(); + + // If already present, loads the existing column-family handles + + final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true) + .setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300).setStatistics(new Statistics()); + dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) { + + @Override + protected void log(InfoLogLevel infoLogLevel, String logMsg) { + LOG.info("From Rocks DB(" + rocksDBBasePath + ") : " + logMsg); + } + }); + final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions); + final List<ColumnFamilyHandle> managedHandles = new ArrayList<>(); + FileIOUtils.mkdir(new File(rocksDBBasePath)); + rocksDB = RocksDB.open(dbOptions, rocksDBBasePath, managedColumnFamilies, managedHandles); + + Preconditions.checkArgument(managedHandles.size() == managedColumnFamilies.size(), + "Unexpected number of handles are returned"); + for (int index = 0; index < managedHandles.size(); index++) { + ColumnFamilyHandle handle = managedHandles.get(index); + ColumnFamilyDescriptor descriptor = managedColumnFamilies.get(index); + String familyNameFromHandle = new String(handle.getName()); + String familyNameFromDescriptor = new String(descriptor.getName()); + + Preconditions.checkArgument(familyNameFromDescriptor.equals(familyNameFromHandle), + "Family Handles not in order with descriptors"); + managedHandlesMap.put(familyNameFromHandle, handle); + managedDescriptorMap.put(familyNameFromDescriptor, descriptor); + } + } catch (RocksDBException | IOException re) { + LOG.error("Got exception opening Rocks DB instance ", re); + throw new RuntimeException(re); + } + } + + /** + * Helper to load managed column family descriptors. + */ + private List<ColumnFamilyDescriptor> loadManagedColumnFamilies(DBOptions dbOptions) throws RocksDBException { + final List<ColumnFamilyDescriptor> managedColumnFamilies = new ArrayList<>(); + final Options options = new Options(dbOptions, new ColumnFamilyOptions()); + List<byte[]> existing = RocksDB.listColumnFamilies(options, rocksDBBasePath); + + if (existing.isEmpty()) { + LOG.info("No column family found. Loading default"); + managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); + } else { + LOG.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList())); + managedColumnFamilies + .addAll(existing.stream().map(RocksDBDAO::getColumnFamilyDescriptor).collect(Collectors.toList())); + } + return managedColumnFamilies; + } + + private static ColumnFamilyDescriptor getColumnFamilyDescriptor(byte[] columnFamilyName) { + return new ColumnFamilyDescriptor(columnFamilyName, new ColumnFamilyOptions()); + } + + /** + * Perform a batch write operation. + */ + public void writeBatch(BatchHandler handler) { + try (WriteBatch batch = new WriteBatch()) { + handler.apply(batch); + getRocksDB().write(new WriteOptions(), batch); + } catch (RocksDBException re) { + throw new RuntimeException(re); + } + } + + /** + * Helper to add put operation in batch. + * + * @param batch Batch Handle + * @param columnFamilyName Column Family + * @param key Key + * @param value Payload + */ + public void putInBatch(WriteBatch batch, String columnFamilyName, String key, V value) { + try { + byte[] payload = serializeValue(value); + batch.put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Helper to add put operation in batch. + * + * @param batch Batch Handle + * @param columnFamilyName Column Family + * @param key Key + * @param value Payload + */ + public void putInBatch(WriteBatch batch, String columnFamilyName, K key, V value) { + try { + byte[] keyBytes = serializeKey(key); + byte[] payload = serializeValue(value); + batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform single PUT on a column-family. + * + * @param columnFamilyName Column family name + * @param key Key + * @param value Payload + */ + public void put(String columnFamilyName, String key, V value) { + try { + byte[] payload = serializeValue(value); + getRocksDB().put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform single PUT on a column-family. + * + * @param columnFamilyName Column family name + * @param key Key + * @param value Payload + */ + public void put(String columnFamilyName, K key, V value) { + try { + byte[] payload = serializeValue(value); + getRocksDB().put(managedHandlesMap.get(columnFamilyName), serializeKey(key), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a single Delete operation. + * + * @param columnFamilyName Column Family name + * @param key Key to be deleted + */ + public void delete(String columnFamilyName, String key) { + try { + getRocksDB().delete(managedHandlesMap.get(columnFamilyName), key.getBytes()); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a single Delete operation. + * + * @param columnFamilyName Column Family name + * @param key Key to be deleted + */ + public void delete(String columnFamilyName, K key) { + try { + getRocksDB().delete(managedHandlesMap.get(columnFamilyName), serializeKey(key)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Retrieve a value for a given key in a column family. + * + * @param columnFamilyName Column Family Name + * @param key Key to be retrieved + */ + public V get(String columnFamilyName, String key) { + Preconditions.checkArgument(!closed); + try { + byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), key.getBytes()); + return val == null ? null : deserializeValue(val); + } catch (RocksDBException | IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Retrieve a value for a given key in a column family. + * + * @param columnFamilyName Column Family Name + * @param key Key to be retrieved + */ + public V get(String columnFamilyName, K key) { + Preconditions.checkArgument(!closed); + try { + byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), serializeKey(key)); + return val == null ? null : deserializeValue(val); + } catch (RocksDBException | IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a prefix search and return stream of key-value pairs retrieved. + * @note This stream must be closed after use, otherwise it will cause a memory leak + * + * @param columnFamilyName Column Family Name + * @param prefix Prefix Key + */ + public Stream<Tuple2<K, V>> prefixSearch(String columnFamilyName, byte[] prefix) { + Preconditions.checkArgument(!closed); + final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName)); + Iterator<Tuple2<K, V>> conditionalIt = new ConditionalIteratorWrapper<>(it, this, prefix); + return StreamSupport + .stream(Spliterators.spliteratorUnknownSize(conditionalIt, 0), false) + .onClose(() -> it.close()); + } + + /** + * Perform a prefix delete and return stream of key-value pairs retrieved. + * + * @param columnFamilyName Column Family Name + * @param prefix Prefix Key + */ + public void prefixDelete(String columnFamilyName, String prefix) { Review Comment: Maybe we shouldn't add code that is not used at present since it will add up the test cost ########## inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/collections/RocksDBKVBuffer.java: ########## @@ -0,0 +1,177 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink.collections; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.Closeable; +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.UUID; +import java.util.stream.Stream; + +/** + * This class provides a disk spillable only kv buffer implementation. + * All of the data is stored using the RocksDB implementation. + */ +public final class RocksDBKVBuffer<T, R> implements Closeable, KVBuffer<T, R>, Serializable { + + // ColumnFamily allows partitioning data within RockDB, which allows + // independent configuration and faster deletes across partitions + // https://github.com/facebook/rocksdb/wiki/Column-Families + // For this use case, we use a single static column family/ partition + private static final Logger LOG = LoggerFactory.getLogger(RocksDBKVBuffer.class); + private static final long serialVersionUID = 1L; + private static final String ROCKSDB_COL_FAMILY = "rocksdb-diskmap"; + + private transient RocksDBDAO<T, R> rocksDb; + private transient boolean closed = false; + private transient Thread shutdownThread = null; + protected String diskMapPath; + protected final TypeSerializer<T> keySerializer; + protected final TypeSerializer<R> valueSerializer; + protected final String rocksDbStoragePath; + + public RocksDBKVBuffer( + TypeSerializer<T> keySerializer, + TypeSerializer<R> valueSerializer, + String rocksDbStoragePath) { + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.rocksDbStoragePath = rocksDbStoragePath; + } + + @Override + public R put(T key, R value) { + checkClosed(); + lazyGetRocksDb().put(ROCKSDB_COL_FAMILY, key, value); + return value; + } + + @Override + public R remove(T key) { + checkClosed(); + R value = get(key); + if (value != null) { + lazyGetRocksDb().delete(ROCKSDB_COL_FAMILY, (T) key); + } + return value; + } + + @Override + public R get(T key) { + checkClosed(); + return lazyGetRocksDb().get(ROCKSDB_COL_FAMILY, key); + } + + /** + * Perform a range search according to the prefix of the key, and the KV data of all keys matching the + * prefix will be returned + * @note This stream must be closed after use, otherwise it will cause a memory leak + * + * @param keyPrefix key prefix binary data + * @return + */ + @Override + public Stream<Tuple2<T, R>> scan(byte[] keyPrefix) { + checkClosed(); + return lazyGetRocksDb().prefixSearch(ROCKSDB_COL_FAMILY, keyPrefix); + } + + @Override + public void clear() { + checkClosed(); + // todo: Here use close to mock clear.Because drop column family and recreate column family has memory leak bug. + // In rocksdb 5.x after dropColumnFamily and close, it will wait rocksdb release memory and disk itself. + // So if open a same columnFamily, it will cause memory leak. So you could not close and recreate a + // column family as soon as possible.In rocksdb 6.x it is ok. + if (null != rocksDb) { + LOG.info("Close rocksdb dir in {}", diskMapPath); + rocksDb.close(); + cleanup(false); + } + rocksDb = null; + } + + @Override + public void close() { + if (null != rocksDb) { + LOG.info("Close rocksdb dir in {}", diskMapPath); + rocksDb.close(); + cleanup(false); + } + rocksDb = null; + closed = true; + + } + + private void checkClosed() { + Preconditions.checkArgument(!closed, "Could not operate a close RocksDB KV Buffer"); + } + + private void cleanup() { + this.cleanup(true); + } + + private void addShutDownHook() { + this.shutdownThread = new Thread(this::cleanup); + Runtime.getRuntime().addShutdownHook(this.shutdownThread); + } + + private void cleanup(boolean isTriggeredFromShutdownHook) { + try { + FileIOUtils.deleteDirectory(new File(diskMapPath)); + } catch (IOException var3) { + LOG.warn("Error while deleting the disk map directory=" + this.diskMapPath, var3); + } + + if (!isTriggeredFromShutdownHook && this.shutdownThread != null) { + Runtime.getRuntime().removeShutdownHook(this.shutdownThread); + } + } + + private RocksDBDAO<T, R> lazyGetRocksDb() { + if (null == rocksDb) { Review Comment: rocksdb should be volatile when use double check lock ########## inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/collections/RocksDBDAO.java: ########## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink.collections; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.rocksdb.AbstractImmutableNativeReference; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Statistics; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterators; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Data access objects for storing and retrieving objects in Rocks DB. + */ +public class RocksDBDAO<K, V> { + + private static final Logger LOG = LogManager.getLogger(RocksDBDAO.class); + + private transient ConcurrentHashMap<String, ColumnFamilyHandle> managedHandlesMap; + private transient ConcurrentHashMap<String, ColumnFamilyDescriptor> managedDescriptorMap; + private transient RocksDB rocksDB; + private boolean closed = false; + private final String rocksDBBasePath; + private long totalBytesWritten; + private final TypeSerializer<K> keySerializer; + private final TypeSerializer<V> valueSerializer; + private final DataInputDeserializer inputBuffer; + private final DataOutputSerializer outputBuffer; + + public RocksDBDAO( + String basePath, + String rocksDBBasePath, + TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer) { + this.rocksDBBasePath = + String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID()); + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.inputBuffer = new DataInputDeserializer(); + this.outputBuffer = new DataOutputSerializer(4096); + init(); + totalBytesWritten = 0L; + } + + /** + * Create RocksDB if not initialized. + */ + private RocksDB getRocksDB() { + return rocksDB; + } + + /** + * Initialized Rocks DB instance. + */ + private void init() { + try { + LOG.info("DELETING RocksDB persisted at " + rocksDBBasePath); + FileIOUtils.deleteDirectory(new File(rocksDBBasePath)); + + managedHandlesMap = new ConcurrentHashMap<>(); + managedDescriptorMap = new ConcurrentHashMap<>(); + + // If already present, loads the existing column-family handles + + final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true) + .setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300).setStatistics(new Statistics()); + dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) { + + @Override + protected void log(InfoLogLevel infoLogLevel, String logMsg) { + LOG.info("From Rocks DB(" + rocksDBBasePath + ") : " + logMsg); + } + }); + final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions); + final List<ColumnFamilyHandle> managedHandles = new ArrayList<>(); + FileIOUtils.mkdir(new File(rocksDBBasePath)); + rocksDB = RocksDB.open(dbOptions, rocksDBBasePath, managedColumnFamilies, managedHandles); + + Preconditions.checkArgument(managedHandles.size() == managedColumnFamilies.size(), + "Unexpected number of handles are returned"); + for (int index = 0; index < managedHandles.size(); index++) { + ColumnFamilyHandle handle = managedHandles.get(index); + ColumnFamilyDescriptor descriptor = managedColumnFamilies.get(index); + String familyNameFromHandle = new String(handle.getName()); + String familyNameFromDescriptor = new String(descriptor.getName()); + + Preconditions.checkArgument(familyNameFromDescriptor.equals(familyNameFromHandle), + "Family Handles not in order with descriptors"); + managedHandlesMap.put(familyNameFromHandle, handle); + managedDescriptorMap.put(familyNameFromDescriptor, descriptor); + } + } catch (RocksDBException | IOException re) { + LOG.error("Got exception opening Rocks DB instance ", re); + throw new RuntimeException(re); + } + } + + /** + * Helper to load managed column family descriptors. + */ + private List<ColumnFamilyDescriptor> loadManagedColumnFamilies(DBOptions dbOptions) throws RocksDBException { + final List<ColumnFamilyDescriptor> managedColumnFamilies = new ArrayList<>(); + final Options options = new Options(dbOptions, new ColumnFamilyOptions()); + List<byte[]> existing = RocksDB.listColumnFamilies(options, rocksDBBasePath); + + if (existing.isEmpty()) { + LOG.info("No column family found. Loading default"); + managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); + } else { + LOG.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList())); + managedColumnFamilies + .addAll(existing.stream().map(RocksDBDAO::getColumnFamilyDescriptor).collect(Collectors.toList())); + } + return managedColumnFamilies; + } + + private static ColumnFamilyDescriptor getColumnFamilyDescriptor(byte[] columnFamilyName) { + return new ColumnFamilyDescriptor(columnFamilyName, new ColumnFamilyOptions()); + } + + /** + * Perform a batch write operation. + */ + public void writeBatch(BatchHandler handler) { + try (WriteBatch batch = new WriteBatch()) { + handler.apply(batch); + getRocksDB().write(new WriteOptions(), batch); + } catch (RocksDBException re) { + throw new RuntimeException(re); + } + } + + /** + * Helper to add put operation in batch. + * + * @param batch Batch Handle + * @param columnFamilyName Column Family + * @param key Key + * @param value Payload + */ + public void putInBatch(WriteBatch batch, String columnFamilyName, String key, V value) { + try { + byte[] payload = serializeValue(value); + batch.put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Helper to add put operation in batch. + * + * @param batch Batch Handle + * @param columnFamilyName Column Family + * @param key Key + * @param value Payload + */ + public void putInBatch(WriteBatch batch, String columnFamilyName, K key, V value) { + try { + byte[] keyBytes = serializeKey(key); + byte[] payload = serializeValue(value); + batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform single PUT on a column-family. + * + * @param columnFamilyName Column family name + * @param key Key + * @param value Payload + */ + public void put(String columnFamilyName, String key, V value) { + try { + byte[] payload = serializeValue(value); + getRocksDB().put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform single PUT on a column-family. + * + * @param columnFamilyName Column family name + * @param key Key + * @param value Payload + */ + public void put(String columnFamilyName, K key, V value) { + try { + byte[] payload = serializeValue(value); + getRocksDB().put(managedHandlesMap.get(columnFamilyName), serializeKey(key), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a single Delete operation. + * + * @param columnFamilyName Column Family name + * @param key Key to be deleted + */ + public void delete(String columnFamilyName, String key) { + try { + getRocksDB().delete(managedHandlesMap.get(columnFamilyName), key.getBytes()); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a single Delete operation. + * + * @param columnFamilyName Column Family name + * @param key Key to be deleted + */ + public void delete(String columnFamilyName, K key) { + try { + getRocksDB().delete(managedHandlesMap.get(columnFamilyName), serializeKey(key)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Retrieve a value for a given key in a column family. + * + * @param columnFamilyName Column Family Name + * @param key Key to be retrieved + */ + public V get(String columnFamilyName, String key) { + Preconditions.checkArgument(!closed); + try { + byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), key.getBytes()); + return val == null ? null : deserializeValue(val); + } catch (RocksDBException | IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Retrieve a value for a given key in a column family. + * + * @param columnFamilyName Column Family Name + * @param key Key to be retrieved + */ + public V get(String columnFamilyName, K key) { + Preconditions.checkArgument(!closed); + try { + byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), serializeKey(key)); + return val == null ? null : deserializeValue(val); + } catch (RocksDBException | IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a prefix search and return stream of key-value pairs retrieved. + * @note This stream must be closed after use, otherwise it will cause a memory leak + * + * @param columnFamilyName Column Family Name + * @param prefix Prefix Key + */ + public Stream<Tuple2<K, V>> prefixSearch(String columnFamilyName, byte[] prefix) { + Preconditions.checkArgument(!closed); + final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName)); + Iterator<Tuple2<K, V>> conditionalIt = new ConditionalIteratorWrapper<>(it, this, prefix); + return StreamSupport + .stream(Spliterators.spliteratorUnknownSize(conditionalIt, 0), false) + .onClose(() -> it.close()); + } + + /** + * Perform a prefix delete and return stream of key-value pairs retrieved. + * + * @param columnFamilyName Column Family Name + * @param prefix Prefix Key + */ + public void prefixDelete(String columnFamilyName, String prefix) { + Preconditions.checkArgument(!closed); + LOG.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName); + final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName)); + it.seek(prefix.getBytes()); + // Find first and last keys to be deleted + String firstEntry = null; + String lastEntry = null; + while (it.isValid() && new String(it.key()).startsWith(prefix)) { + String result = new String(it.key()); + it.next(); + if (firstEntry == null) { + firstEntry = result; + } + lastEntry = result; + } + it.close(); + + if (null != firstEntry) { + try { + // This will not delete the last entry + getRocksDB().deleteRange(managedHandlesMap.get(columnFamilyName), firstEntry.getBytes(), + lastEntry.getBytes()); + // Delete the last entry + getRocksDB().delete(lastEntry.getBytes()); + } catch (RocksDBException e) { + LOG.error("Got exception performing range delete"); + throw new RuntimeException(e); + } + } + } + + /** + * Add a new column family to store. + * + * @param columnFamilyName Column family name + */ + public void addColumnFamily(String columnFamilyName) { + Preconditions.checkArgument(!closed); + + managedDescriptorMap.computeIfAbsent(columnFamilyName, colFamilyName -> { + try { + ColumnFamilyDescriptor descriptor = getColumnFamilyDescriptor(colFamilyName.getBytes()); + ColumnFamilyHandle handle = getRocksDB().createColumnFamily(descriptor); + managedHandlesMap.put(colFamilyName, handle); + return descriptor; + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + }); + } + + /** + * Note : Does not delete from underlying DB. Just closes the handle. + * + * @param columnFamilyName Column Family Name + */ + public void dropColumnFamily(String columnFamilyName) { Review Comment: same ########## inlong-sort/sort-connectors/iceberg/src/main/java/org/apache/inlong/sort/iceberg/sink/collections/RocksDBDAO.java: ########## @@ -0,0 +1,523 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.inlong.sort.iceberg.sink.collections; + +import org.apache.flink.api.common.typeutils.TypeSerializer; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.core.memory.DataInputDeserializer; +import org.apache.flink.core.memory.DataOutputSerializer; +import org.apache.flink.shaded.guava18.com.google.common.base.Preconditions; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.rocksdb.AbstractImmutableNativeReference; +import org.rocksdb.ColumnFamilyDescriptor; +import org.rocksdb.ColumnFamilyHandle; +import org.rocksdb.ColumnFamilyOptions; +import org.rocksdb.DBOptions; +import org.rocksdb.InfoLogLevel; +import org.rocksdb.Options; +import org.rocksdb.RocksDB; +import org.rocksdb.RocksDBException; +import org.rocksdb.RocksIterator; +import org.rocksdb.Statistics; +import org.rocksdb.WriteBatch; +import org.rocksdb.WriteOptions; + +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Spliterators; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import java.util.stream.StreamSupport; + +/** + * Data access objects for storing and retrieving objects in Rocks DB. + */ +public class RocksDBDAO<K, V> { + + private static final Logger LOG = LogManager.getLogger(RocksDBDAO.class); + + private transient ConcurrentHashMap<String, ColumnFamilyHandle> managedHandlesMap; + private transient ConcurrentHashMap<String, ColumnFamilyDescriptor> managedDescriptorMap; + private transient RocksDB rocksDB; + private boolean closed = false; + private final String rocksDBBasePath; + private long totalBytesWritten; + private final TypeSerializer<K> keySerializer; + private final TypeSerializer<V> valueSerializer; + private final DataInputDeserializer inputBuffer; + private final DataOutputSerializer outputBuffer; + + public RocksDBDAO( + String basePath, + String rocksDBBasePath, + TypeSerializer<K> keySerializer, + TypeSerializer<V> valueSerializer) { + this.rocksDBBasePath = + String.format("%s/%s/%s", rocksDBBasePath, basePath.replace("/", "_"), UUID.randomUUID()); + this.keySerializer = keySerializer; + this.valueSerializer = valueSerializer; + this.inputBuffer = new DataInputDeserializer(); + this.outputBuffer = new DataOutputSerializer(4096); + init(); + totalBytesWritten = 0L; + } + + /** + * Create RocksDB if not initialized. + */ + private RocksDB getRocksDB() { + return rocksDB; + } + + /** + * Initialized Rocks DB instance. + */ + private void init() { + try { + LOG.info("DELETING RocksDB persisted at " + rocksDBBasePath); + FileIOUtils.deleteDirectory(new File(rocksDBBasePath)); + + managedHandlesMap = new ConcurrentHashMap<>(); + managedDescriptorMap = new ConcurrentHashMap<>(); + + // If already present, loads the existing column-family handles + + final DBOptions dbOptions = new DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true) + .setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300).setStatistics(new Statistics()); + dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) { + + @Override + protected void log(InfoLogLevel infoLogLevel, String logMsg) { + LOG.info("From Rocks DB(" + rocksDBBasePath + ") : " + logMsg); + } + }); + final List<ColumnFamilyDescriptor> managedColumnFamilies = loadManagedColumnFamilies(dbOptions); + final List<ColumnFamilyHandle> managedHandles = new ArrayList<>(); + FileIOUtils.mkdir(new File(rocksDBBasePath)); + rocksDB = RocksDB.open(dbOptions, rocksDBBasePath, managedColumnFamilies, managedHandles); + + Preconditions.checkArgument(managedHandles.size() == managedColumnFamilies.size(), + "Unexpected number of handles are returned"); + for (int index = 0; index < managedHandles.size(); index++) { + ColumnFamilyHandle handle = managedHandles.get(index); + ColumnFamilyDescriptor descriptor = managedColumnFamilies.get(index); + String familyNameFromHandle = new String(handle.getName()); + String familyNameFromDescriptor = new String(descriptor.getName()); + + Preconditions.checkArgument(familyNameFromDescriptor.equals(familyNameFromHandle), + "Family Handles not in order with descriptors"); + managedHandlesMap.put(familyNameFromHandle, handle); + managedDescriptorMap.put(familyNameFromDescriptor, descriptor); + } + } catch (RocksDBException | IOException re) { + LOG.error("Got exception opening Rocks DB instance ", re); + throw new RuntimeException(re); + } + } + + /** + * Helper to load managed column family descriptors. + */ + private List<ColumnFamilyDescriptor> loadManagedColumnFamilies(DBOptions dbOptions) throws RocksDBException { + final List<ColumnFamilyDescriptor> managedColumnFamilies = new ArrayList<>(); + final Options options = new Options(dbOptions, new ColumnFamilyOptions()); + List<byte[]> existing = RocksDB.listColumnFamilies(options, rocksDBBasePath); + + if (existing.isEmpty()) { + LOG.info("No column family found. Loading default"); + managedColumnFamilies.add(getColumnFamilyDescriptor(RocksDB.DEFAULT_COLUMN_FAMILY)); + } else { + LOG.info("Loading column families :" + existing.stream().map(String::new).collect(Collectors.toList())); + managedColumnFamilies + .addAll(existing.stream().map(RocksDBDAO::getColumnFamilyDescriptor).collect(Collectors.toList())); + } + return managedColumnFamilies; + } + + private static ColumnFamilyDescriptor getColumnFamilyDescriptor(byte[] columnFamilyName) { + return new ColumnFamilyDescriptor(columnFamilyName, new ColumnFamilyOptions()); + } + + /** + * Perform a batch write operation. + */ + public void writeBatch(BatchHandler handler) { + try (WriteBatch batch = new WriteBatch()) { + handler.apply(batch); + getRocksDB().write(new WriteOptions(), batch); + } catch (RocksDBException re) { + throw new RuntimeException(re); + } + } + + /** + * Helper to add put operation in batch. + * + * @param batch Batch Handle + * @param columnFamilyName Column Family + * @param key Key + * @param value Payload + */ + public void putInBatch(WriteBatch batch, String columnFamilyName, String key, V value) { + try { + byte[] payload = serializeValue(value); + batch.put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Helper to add put operation in batch. + * + * @param batch Batch Handle + * @param columnFamilyName Column Family + * @param key Key + * @param value Payload + */ + public void putInBatch(WriteBatch batch, String columnFamilyName, K key, V value) { + try { + byte[] keyBytes = serializeKey(key); + byte[] payload = serializeValue(value); + batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform single PUT on a column-family. + * + * @param columnFamilyName Column family name + * @param key Key + * @param value Payload + */ + public void put(String columnFamilyName, String key, V value) { + try { + byte[] payload = serializeValue(value); + getRocksDB().put(managedHandlesMap.get(columnFamilyName), key.getBytes(), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform single PUT on a column-family. + * + * @param columnFamilyName Column family name + * @param key Key + * @param value Payload + */ + public void put(String columnFamilyName, K key, V value) { + try { + byte[] payload = serializeValue(value); + getRocksDB().put(managedHandlesMap.get(columnFamilyName), serializeKey(key), payload); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a single Delete operation. + * + * @param columnFamilyName Column Family name + * @param key Key to be deleted + */ + public void delete(String columnFamilyName, String key) { + try { + getRocksDB().delete(managedHandlesMap.get(columnFamilyName), key.getBytes()); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a single Delete operation. + * + * @param columnFamilyName Column Family name + * @param key Key to be deleted + */ + public void delete(String columnFamilyName, K key) { + try { + getRocksDB().delete(managedHandlesMap.get(columnFamilyName), serializeKey(key)); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + + /** + * Retrieve a value for a given key in a column family. + * + * @param columnFamilyName Column Family Name + * @param key Key to be retrieved + */ + public V get(String columnFamilyName, String key) { + Preconditions.checkArgument(!closed); + try { + byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), key.getBytes()); + return val == null ? null : deserializeValue(val); + } catch (RocksDBException | IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Retrieve a value for a given key in a column family. + * + * @param columnFamilyName Column Family Name + * @param key Key to be retrieved + */ + public V get(String columnFamilyName, K key) { + Preconditions.checkArgument(!closed); + try { + byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), serializeKey(key)); + return val == null ? null : deserializeValue(val); + } catch (RocksDBException | IOException e) { + throw new RuntimeException(e); + } + } + + /** + * Perform a prefix search and return stream of key-value pairs retrieved. + * @note This stream must be closed after use, otherwise it will cause a memory leak + * + * @param columnFamilyName Column Family Name + * @param prefix Prefix Key + */ + public Stream<Tuple2<K, V>> prefixSearch(String columnFamilyName, byte[] prefix) { + Preconditions.checkArgument(!closed); + final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName)); + Iterator<Tuple2<K, V>> conditionalIt = new ConditionalIteratorWrapper<>(it, this, prefix); + return StreamSupport + .stream(Spliterators.spliteratorUnknownSize(conditionalIt, 0), false) + .onClose(() -> it.close()); + } + + /** + * Perform a prefix delete and return stream of key-value pairs retrieved. + * + * @param columnFamilyName Column Family Name + * @param prefix Prefix Key + */ + public void prefixDelete(String columnFamilyName, String prefix) { + Preconditions.checkArgument(!closed); + LOG.info("Prefix DELETE (query=" + prefix + ") on " + columnFamilyName); + final RocksIterator it = getRocksDB().newIterator(managedHandlesMap.get(columnFamilyName)); + it.seek(prefix.getBytes()); + // Find first and last keys to be deleted + String firstEntry = null; + String lastEntry = null; + while (it.isValid() && new String(it.key()).startsWith(prefix)) { + String result = new String(it.key()); + it.next(); + if (firstEntry == null) { + firstEntry = result; + } + lastEntry = result; + } + it.close(); + + if (null != firstEntry) { + try { + // This will not delete the last entry + getRocksDB().deleteRange(managedHandlesMap.get(columnFamilyName), firstEntry.getBytes(), + lastEntry.getBytes()); + // Delete the last entry + getRocksDB().delete(lastEntry.getBytes()); + } catch (RocksDBException e) { + LOG.error("Got exception performing range delete"); + throw new RuntimeException(e); + } + } + } + + /** + * Add a new column family to store. + * + * @param columnFamilyName Column family name + */ + public void addColumnFamily(String columnFamilyName) { + Preconditions.checkArgument(!closed); + + managedDescriptorMap.computeIfAbsent(columnFamilyName, colFamilyName -> { + try { + ColumnFamilyDescriptor descriptor = getColumnFamilyDescriptor(colFamilyName.getBytes()); + ColumnFamilyHandle handle = getRocksDB().createColumnFamily(descriptor); + managedHandlesMap.put(colFamilyName, handle); + return descriptor; + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + }); + } + + /** + * Note : Does not delete from underlying DB. Just closes the handle. + * + * @param columnFamilyName Column Family Name + */ + public void dropColumnFamily(String columnFamilyName) { + Preconditions.checkArgument(!closed); + + managedDescriptorMap.computeIfPresent(columnFamilyName, (colFamilyName, descriptor) -> { + ColumnFamilyHandle handle = managedHandlesMap.get(colFamilyName); + try { + getRocksDB().dropColumnFamily(handle); + handle.close(); + } catch (RocksDBException e) { + throw new RuntimeException(e); + } + managedHandlesMap.remove(columnFamilyName); + return null; + }); + } + + /** + * Close the DAO object. + */ + public synchronized void close() { + if (!closed) { + closed = true; + managedHandlesMap.values().forEach(AbstractImmutableNativeReference::close); + managedHandlesMap.clear(); + managedDescriptorMap.clear(); + getRocksDB().close(); + try { + FileIOUtils.deleteDirectory(new File(rocksDBBasePath)); + } catch (IOException e) { + throw new RuntimeException(e.getMessage(), e); + } + } + } + + public long getTotalBytesWritten() { + return totalBytesWritten; + } + + private byte[] serializeKey(K key) throws IOException { + keySerializer.serialize(key, outputBuffer); + byte[] keyBytes = outputBuffer.getCopyOfBuffer(); + outputBuffer.clear(); + return keyBytes; + } + + private byte[] serializeValue(V value) throws IOException { + valueSerializer.serialize(value, outputBuffer); + byte[] payload = outputBuffer.getCopyOfBuffer(); + outputBuffer.clear(); + + totalBytesWritten += payload.length; + return payload; + } + + private K deserializeKey(byte[] buffer) throws IOException { + inputBuffer.setBuffer(buffer); + return keySerializer.deserialize(inputBuffer); + } + + private V deserializeValue(byte[] buffer) throws IOException { + inputBuffer.setBuffer(buffer); + return valueSerializer.deserialize(inputBuffer); + } + + String getRocksDBBasePath() { Review Comment: ditto -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
