This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 0f5744a9 [FLINK-31397] Introduce write-once hash lookup store
0f5744a9 is described below
commit 0f5744a92bc52f52bc0f6644dc063dcb60fe326c
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 14 11:15:55 2023 +0800
[FLINK-31397] Introduce write-once hash lookup store
This closes #593
---
.../table/store/lookup/LookupStoreFactory.java | 38 ++
.../table/store/lookup/LookupStoreReader.java | 29 ++
.../table/store/lookup/LookupStoreWriter.java | 29 ++
.../store/lookup/hash/HashLookupStoreFactory.java | 49 +++
.../store/lookup/hash/HashLookupStoreReader.java | 455 +++++++++++++++++++
.../store/lookup/hash/HashLookupStoreWriter.java | 486 +++++++++++++++++++++
.../flink/table/store/utils/SimpleReadBuffer.java | 43 ++
.../flink/table/store/utils/VarLengthIntUtils.java | 134 ++++++
.../src/main/resources/META-INF/NOTICE | 4 +
.../lookup/hash/HashLookupStoreFactoryTest.java | 421 ++++++++++++++++++
10 files changed, 1688 insertions(+)
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreFactory.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreFactory.java
new file mode 100644
index 00000000..88d2b55d
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * A key-value store for lookup, key-value store should be single binary file
written once and ready
+ * to be used. This factory provide two interfaces:
+ *
+ * <ul>
+ * <li>Writer: written once to prepare binary file.
+ * <li>Reader: lookup value by key bytes.
+ * </ul>
+ */
+public interface LookupStoreFactory {
+
+ LookupStoreWriter createWriter(File file) throws IOException;
+
+ LookupStoreReader createReader(File file) throws IOException;
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreReader.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreReader.java
new file mode 100644
index 00000000..2ceaf0e9
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreReader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** Reader, lookup value by key bytes. */
+public interface LookupStoreReader extends Closeable {
+
+ /** Lookup value by key. */
+ byte[] lookup(byte[] key) throws IOException;
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreWriter.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreWriter.java
new file mode 100644
index 00000000..03fe86a5
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreWriter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** Writer to prepare binary file. */
+public interface LookupStoreWriter extends Closeable {
+
+ /** Put key value to store. */
+ void put(byte[] key, byte[] value) throws IOException;
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
new file mode 100644
index 00000000..895ab08e
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreFactory;
+import org.apache.flink.table.store.options.MemorySize;
+
+import java.io.File;
+import java.io.IOException;
+
+/** A {@link LookupStoreFactory} which uses hash to lookup records on disk. */
+public class HashLookupStoreFactory implements LookupStoreFactory {
+
+ private final double loadFactor;
+ private final boolean useMmp;
+ private final MemorySize mmpSegmentSize;
+
+ public HashLookupStoreFactory(double loadFactor, boolean useMmp,
MemorySize mmpSegmentSize) {
+ this.loadFactor = loadFactor;
+ this.useMmp = useMmp;
+ this.mmpSegmentSize = mmpSegmentSize;
+ }
+
+ @Override
+ public HashLookupStoreWriter createWriter(File file) throws IOException {
+ return new HashLookupStoreWriter(loadFactor, file);
+ }
+
+ @Override
+ public HashLookupStoreReader createReader(File file) throws IOException {
+ return new HashLookupStoreReader(useMmp, mmpSegmentSize.getBytes(),
file);
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
new file mode 100644
index 00000000..390d599b
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreReader;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.SimpleReadBuffer;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.Map;
+
+/** Internal read implementation for hash kv store. */
+public class HashLookupStoreReader
+ implements LookupStoreReader, Iterable<Map.Entry<byte[], byte[]>> {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HashLookupStoreReader.class.getName());
+
+ // Buffer segment size
+ private final long mmpSegmentSize;
+ // Key count for each key length
+ private final int[] keyCounts;
+ // Slot size for each key length
+ private final int[] slotSizes;
+ // Number of slots for each key length
+ private final int[] slots;
+ // Offset of the index for different key length
+ private final int[] indexOffsets;
+ // Offset of the data in the channel
+ private final long dataOffset;
+ // Offset of the data for different key length
+ private final long[] dataOffsets;
+ // Data size
+ private final long dataSize;
+ // Index and data buffers
+ private MappedByteBuffer indexBuffer;
+ private MappedByteBuffer[] dataBuffers;
+ // FileChannel
+ private RandomAccessFile mappedFile;
+ private FileChannel channel;
+ // Use MMap for data?
+ private final boolean mMapData;
+ // Buffers
+ private final SimpleReadBuffer sizeBuffer = new SimpleReadBuffer(new
byte[5]);
+ private final byte[] slotBuffer;
+
+ HashLookupStoreReader(boolean useMmp, long mmpSegmentSize, File file)
throws IOException {
+ // File path
+ if (!file.exists()) {
+ throw new FileNotFoundException("File " + file.getAbsolutePath() +
" not found");
+ }
+ LOG.info("Opening file {}", file.getName());
+
+ this.mmpSegmentSize = mmpSegmentSize;
+ // Check valid segmentSize
+ if (this.mmpSegmentSize > Integer.MAX_VALUE) {
+ throw new IllegalArgumentException("The mmpSegmentSize can't be
larger than 2GB");
+ }
+
+ // Open file and read metadata
+ long createdAt;
+ FileInputStream inputStream = new FileInputStream(file);
+ DataInputStream dataInputStream = new DataInputStream(new
BufferedInputStream(inputStream));
+ // Offset of the index in the channel
+ int indexOffset;
+ int keyCount;
+ try {
+ // Time
+ createdAt = dataInputStream.readLong();
+
+ // Metadata counters
+ keyCount = dataInputStream.readInt();
+ // Number of different key length
+ int keyLengthCount = dataInputStream.readInt();
+ // Max key length
+ int maxKeyLength = dataInputStream.readInt();
+
+ // Read offset counts and keys
+ indexOffsets = new int[maxKeyLength + 1];
+ dataOffsets = new long[maxKeyLength + 1];
+ keyCounts = new int[maxKeyLength + 1];
+ slots = new int[maxKeyLength + 1];
+ slotSizes = new int[maxKeyLength + 1];
+
+ int maxSlotSize = 0;
+ for (int i = 0; i < keyLengthCount; i++) {
+ int keyLength = dataInputStream.readInt();
+
+ keyCounts[keyLength] = dataInputStream.readInt();
+ slots[keyLength] = dataInputStream.readInt();
+ slotSizes[keyLength] = dataInputStream.readInt();
+ indexOffsets[keyLength] = dataInputStream.readInt();
+ dataOffsets[keyLength] = dataInputStream.readLong();
+
+ maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]);
+ }
+
+ slotBuffer = new byte[maxSlotSize];
+
+ // Read index and data offset
+ indexOffset = dataInputStream.readInt();
+ dataOffset = dataInputStream.readLong();
+ } finally {
+ // Close metadata
+ dataInputStream.close();
+ inputStream.close();
+ }
+
+ // Create Mapped file in read-only mode
+ mappedFile = new RandomAccessFile(file, "r");
+ channel = mappedFile.getChannel();
+ long fileSize = file.length();
+
+ // Create index buffer
+ indexBuffer =
+ channel.map(FileChannel.MapMode.READ_ONLY, indexOffset,
dataOffset - indexOffset);
+
+ // Create data buffers
+ dataSize = fileSize - dataOffset;
+
+ // Check if data size fits in memory map limit
+ if (!useMmp) {
+ // Use classical disk read
+ mMapData = false;
+ dataBuffers = null;
+ } else {
+ // Use Mmap
+ mMapData = true;
+
+ // Build data buffers
+ int bufArraySize =
+ (int) (dataSize / this.mmpSegmentSize)
+ + ((dataSize % this.mmpSegmentSize != 0) ? 1 : 0);
+ dataBuffers = new MappedByteBuffer[bufArraySize];
+ int bufIdx = 0;
+ for (long offset = 0; offset < dataSize; offset +=
this.mmpSegmentSize) {
+ long remainingFileSize = dataSize - offset;
+ long thisSegmentSize = Math.min(this.mmpSegmentSize,
remainingFileSize);
+ dataBuffers[bufIdx++] =
+ channel.map(
+ FileChannel.MapMode.READ_ONLY,
+ dataOffset + offset,
+ thisSegmentSize);
+ }
+ }
+
+ // logging
+ DecimalFormat integerFormat = new DecimalFormat("#,##0.00");
+ StringBuilder statMsg = new StringBuilder("Storage metadata\n");
+ statMsg.append(" Created at:
").append(formatCreatedAt(createdAt)).append("\n");
+ statMsg.append(" Key count: ").append(keyCount).append("\n");
+ for (int i = 0; i < keyCounts.length; i++) {
+ if (keyCounts[i] > 0) {
+ statMsg.append(" Key count for key length ")
+ .append(i)
+ .append(": ")
+ .append(keyCounts[i])
+ .append("\n");
+ }
+ }
+ statMsg.append(" Index size: ")
+ .append(integerFormat.format((dataOffset - indexOffset) /
(1024.0 * 1024.0)))
+ .append(" Mb\n");
+ statMsg.append(" Data size: ")
+ .append(integerFormat.format((fileSize - dataOffset) / (1024.0
* 1024.0)))
+ .append(" Mb\n");
+ if (mMapData) {
+ statMsg.append(" Number of memory mapped data buffers:
").append(dataBuffers.length);
+ } else {
+ statMsg.append(" Memory mapped data disabled, using disk");
+ }
+ LOG.info(statMsg.toString());
+ }
+
+ @Override
+ public byte[] lookup(byte[] key) throws IOException {
+ int keyLength = key.length;
+ if (keyLength >= slots.length || keyCounts[keyLength] == 0) {
+ return null;
+ }
+ long hash = MurmurHashUtils.hashBytesPositive(key);
+ int numSlots = slots[keyLength];
+ int slotSize = slotSizes[keyLength];
+ int indexOffset = indexOffsets[keyLength];
+ long dataOffset = dataOffsets[keyLength];
+
+ for (int probe = 0; probe < numSlots; probe++) {
+ int slot = (int) ((hash + probe) % numSlots);
+ indexBuffer.position(indexOffset + slot * slotSize);
+ indexBuffer.get(slotBuffer, 0, slotSize);
+
+ long offset = VarLengthIntUtils.decodeLong(slotBuffer, keyLength);
+ if (offset == 0) {
+ return null;
+ }
+ if (isKey(slotBuffer, key)) {
+ return mMapData
+ ? getMMapBytes(dataOffset + offset)
+ : getDiskBytes(dataOffset + offset);
+ }
+ }
+ return null;
+ }
+
+ private boolean isKey(byte[] slotBuffer, byte[] key) {
+ for (int i = 0; i < key.length; i++) {
+ if (slotBuffer[i] != key[i]) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ // Read the data at the given offset, the data can be spread over multiple
data buffers
+ private byte[] getMMapBytes(long offset) {
+ // Read the first 4 bytes to get the size of the data
+ ByteBuffer buf = getDataBuffer(offset);
+ int maxLen = (int) Math.min(5, dataSize - offset);
+
+ int size;
+ if (buf.remaining() >= maxLen) {
+ // Continuous read
+ int pos = buf.position();
+ size = VarLengthIntUtils.decodeInt(buf);
+
+ // Used in case of data is spread over multiple buffers
+ offset += buf.position() - pos;
+ } else {
+ // The size of the data is spread over multiple buffers
+ int len = maxLen;
+ int off = 0;
+ sizeBuffer.reset();
+ while (len > 0) {
+ buf = getDataBuffer(offset + off);
+ int count = Math.min(len, buf.remaining());
+ buf.get(sizeBuffer.getBuf(), off, count);
+ off += count;
+ len -= count;
+ }
+ size = VarLengthIntUtils.decodeInt(sizeBuffer);
+ offset += sizeBuffer.getPos();
+ buf = getDataBuffer(offset);
+ }
+
+ // Create output bytes
+ byte[] res = new byte[size];
+
+ // Check if the data is one buffer
+ if (buf.remaining() >= size) {
+ // Continuous read
+ buf.get(res, 0, size);
+ } else {
+ int len = size;
+ int off = 0;
+ while (len > 0) {
+ buf = getDataBuffer(offset);
+ int count = Math.min(len, buf.remaining());
+ buf.get(res, off, count);
+ offset += count;
+ off += count;
+ len -= count;
+ }
+ }
+
+ return res;
+ }
+
+ // Get data from disk
+ private byte[] getDiskBytes(long offset) throws IOException {
+ mappedFile.seek(dataOffset + offset);
+
+ // Get size of data
+ int size = VarLengthIntUtils.decodeInt(mappedFile);
+
+ // Create output bytes
+ byte[] res = new byte[size];
+
+ // Read data
+ if (mappedFile.read(res) == -1) {
+ throw new EOFException();
+ }
+
+ return res;
+ }
+
+ // Return the data buffer for the given position
+ private ByteBuffer getDataBuffer(long index) {
+ ByteBuffer buf = dataBuffers[(int) (index / mmpSegmentSize)];
+ buf.position((int) (index % mmpSegmentSize));
+ return buf;
+ }
+
+ private String formatCreatedAt(long createdAt) {
+ SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd G 'at'
HH:mm:ss z");
+ Calendar cl = Calendar.getInstance();
+ cl.setTimeInMillis(createdAt);
+ return sdf.format(cl.getTime());
+ }
+
+ @Override
+ public void close() throws IOException {
+ channel.close();
+ mappedFile.close();
+ indexBuffer = null;
+ dataBuffers = null;
+ mappedFile = null;
+ channel = null;
+ }
+
+ @Override
+ public Iterator<Map.Entry<byte[], byte[]>> iterator() {
+ return new StorageIterator(true);
+ }
+
+ public Iterator<Map.Entry<byte[], byte[]>> keys() {
+ return new StorageIterator(false);
+ }
+
+ private class StorageIterator implements Iterator<Map.Entry<byte[],
byte[]>> {
+
+ private final FastEntry entry = new FastEntry();
+ private final boolean withValue;
+ private int currentKeyLength = 0;
+ private byte[] currentSlotBuffer;
+ private long keyIndex;
+ private long keyLimit;
+ private long currentDataOffset;
+ private int currentIndexOffset;
+
+ public StorageIterator(boolean value) {
+ withValue = value;
+ nextKeyLength();
+ }
+
+ private void nextKeyLength() {
+ for (int i = currentKeyLength + 1; i < keyCounts.length; i++) {
+ long c = keyCounts[i];
+ if (c > 0) {
+ currentKeyLength = i;
+ keyLimit += c;
+ currentSlotBuffer = new byte[slotSizes[i]];
+ currentIndexOffset = indexOffsets[i];
+ currentDataOffset = dataOffsets[i];
+ break;
+ }
+ }
+ }
+
+ @Override
+ public boolean hasNext() {
+ return keyIndex < keyLimit;
+ }
+
+ @Override
+ public FastEntry next() {
+ try {
+ indexBuffer.position(currentIndexOffset);
+
+ long offset = 0;
+ while (offset == 0) {
+ indexBuffer.get(currentSlotBuffer);
+ offset = VarLengthIntUtils.decodeLong(currentSlotBuffer,
currentKeyLength);
+ currentIndexOffset += currentSlotBuffer.length;
+ }
+
+ byte[] key = Arrays.copyOf(currentSlotBuffer,
currentKeyLength);
+ byte[] value = null;
+
+ if (withValue) {
+ long valueOffset = currentDataOffset + offset;
+ value = mMapData ? getMMapBytes(valueOffset) :
getDiskBytes(valueOffset);
+ }
+
+ entry.set(key, value);
+
+ if (++keyIndex == keyLimit) {
+ nextKeyLength();
+ }
+ return entry;
+ } catch (IOException ex) {
+ throw new RuntimeException(ex);
+ }
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException("Not supported yet.");
+ }
+
+ private class FastEntry implements Map.Entry<byte[], byte[]> {
+
+ private byte[] key;
+ private byte[] val;
+
+ protected void set(byte[] k, byte[] v) {
+ this.key = k;
+ this.val = v;
+ }
+
+ @Override
+ public byte[] getKey() {
+ return key;
+ }
+
+ @Override
+ public byte[] getValue() {
+ return val;
+ }
+
+ @Override
+ public byte[] setValue(byte[] value) {
+ throw new UnsupportedOperationException("Not supported.");
+ }
+ }
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java
new file mode 100644
index 00000000..2e0be801
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+ // load factor of hash map, default 0.75
+ private final double loadFactor;
+ // Output
+ private final File tempFolder;
+ private final OutputStream outputStream;
+ // Index stream
+ private File[] indexFiles;
+ private DataOutputStream[] indexStreams;
+ // Data stream
+ private File[] dataFiles;
+ private DataOutputStream[] dataStreams;
+ // Cache last value
+ private byte[][] lastValues;
+ private int[] lastValuesLength;
+ // Data length
+ private long[] dataLengths;
+ // Index length
+ private long indexesLength;
+ // Max offset length
+ private int[] maxOffsetLengths;
+ // Number of keys
+ private int keyCount;
+ private int[] keyCounts;
+ // Number of values
+ private int valueCount;
+ // Number of collisions
+ private int collisions;
+
+ HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+ this.loadFactor = loadFactor;
+ if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+ throw new IllegalArgumentException(
+ "Illegal load factor = " + loadFactor + ", should be
between 0.0 and 1.0.");
+ }
+
+ tempFolder = new File(file.getParentFile(),
UUID.randomUUID().toString());
+ if (!tempFolder.mkdir()) {
+ throw new IOException("Can not create temp folder: " + tempFolder);
+ }
+ outputStream = new BufferedOutputStream(new FileOutputStream(file));
+ indexStreams = new DataOutputStream[0];
+ dataStreams = new DataOutputStream[0];
+ indexFiles = new File[0];
+ dataFiles = new File[0];
+ lastValues = new byte[0][];
+ lastValuesLength = new int[0];
+ dataLengths = new long[0];
+ maxOffsetLengths = new int[0];
+ keyCounts = new int[0];
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) throws IOException {
+ int keyLength = key.length;
+
+ // Get the Output stream for that keyLength, each key length has its
own file
+ DataOutputStream indexStream = getIndexStream(keyLength);
+
+ // Write key
+ indexStream.write(key);
+
+ // Check if the value is identical to the last inserted
+ byte[] lastValue = lastValues[keyLength];
+ boolean sameValue = lastValue != null && Arrays.equals(value,
lastValue);
+
+ // Get data stream and length
+ long dataLength = dataLengths[keyLength];
+ if (sameValue) {
+ dataLength -= lastValuesLength[keyLength];
+ }
+
+ // Write offset and record max offset length
+ int offsetLength = VarLengthIntUtils.encodeLong(indexStream,
dataLength);
+ maxOffsetLengths[keyLength] = Math.max(offsetLength,
maxOffsetLengths[keyLength]);
+
+ // Write if data is not the same
+ if (!sameValue) {
+ // Get stream
+ DataOutputStream dataStream = getDataStream(keyLength);
+
+ // Write size and value
+ int valueSize = VarLengthIntUtils.encodeInt(dataStream,
value.length);
+ dataStream.write(value);
+
+ // Update data length
+ dataLengths[keyLength] += valueSize + value.length;
+
+ // Update last value
+ lastValues[keyLength] = value;
+ lastValuesLength[keyLength] = valueSize + value.length;
+
+ valueCount++;
+ }
+
+ keyCount++;
+ keyCounts[keyLength]++;
+ }
+
+ @Override
+ public void close() throws IOException {
+ // Close the data and index streams
+ for (DataOutputStream dos : dataStreams) {
+ if (dos != null) {
+ dos.close();
+ }
+ }
+ for (DataOutputStream dos : indexStreams) {
+ if (dos != null) {
+ dos.close();
+ }
+ }
+
+ // Stats
+ LOG.info("Number of keys: {}", keyCount);
+ LOG.info("Number of values: {}", valueCount);
+
+ // Prepare files to merge
+ List<File> filesToMerge = new ArrayList<>();
+
+ try {
+
+ // Write metadata file
+ File metadataFile = new File(tempFolder, "metadata.dat");
+ metadataFile.deleteOnExit();
+ FileOutputStream metadataOututStream = new
FileOutputStream(metadataFile);
+ DataOutputStream metadataDataOutputStream = new
DataOutputStream(metadataOututStream);
+ writeMetadata(metadataDataOutputStream);
+ metadataDataOutputStream.close();
+ metadataOututStream.close();
+ filesToMerge.add(metadataFile);
+
+ // Build index file
+ for (int i = 0; i < indexFiles.length; i++) {
+ if (indexFiles[i] != null) {
+ filesToMerge.add(buildIndex(i));
+ }
+ }
+
+ // Stats collisions
+ LOG.info("Number of collisions: {}", collisions);
+
+ // Add data files
+ for (File dataFile : dataFiles) {
+ if (dataFile != null) {
+ filesToMerge.add(dataFile);
+ }
+ }
+
+ // Merge and write to output
+ checkFreeDiskSpace(filesToMerge);
+ mergeFiles(filesToMerge, outputStream);
+ } finally {
+ outputStream.close();
+ cleanup(filesToMerge);
+ }
+ }
+
+ private void writeMetadata(DataOutputStream dataOutputStream) throws
IOException {
+ // Write time
+ dataOutputStream.writeLong(System.currentTimeMillis());
+
+ // Prepare
+ int keyLengthCount = getNumKeyCount();
+ int maxKeyLength = keyCounts.length - 1;
+
+ // Write size (number of keys)
+ dataOutputStream.writeInt(keyCount);
+
+ // Write the number of different key length
+ dataOutputStream.writeInt(keyLengthCount);
+
+ // Write the max value for keyLength
+ dataOutputStream.writeInt(maxKeyLength);
+
+ // For each keyLength
+ long datasLength = 0L;
+ for (int i = 0; i < keyCounts.length; i++) {
+ if (keyCounts[i] > 0) {
+ // Write the key length
+ dataOutputStream.writeInt(i);
+
+ // Write key count
+ dataOutputStream.writeInt(keyCounts[i]);
+
+ // Write slot count
+ int slots = (int) Math.round(keyCounts[i] / loadFactor);
+ dataOutputStream.writeInt(slots);
+
+ // Write slot size
+ int offsetLength = maxOffsetLengths[i];
+ dataOutputStream.writeInt(i + offsetLength);
+
+ // Write index offset
+ dataOutputStream.writeInt((int) indexesLength);
+
+ // Increment index length
+ indexesLength += (long) (i + offsetLength) * slots;
+
+ // Write data length
+ dataOutputStream.writeLong(datasLength);
+
+ // Increment data length
+ datasLength += dataLengths[i];
+ }
+ }
+
+ // Write the position of the index and the data
+ int indexOffset =
+ dataOutputStream.size() + (Integer.SIZE / Byte.SIZE) +
(Long.SIZE / Byte.SIZE);
+ dataOutputStream.writeInt(indexOffset);
+ dataOutputStream.writeLong(indexOffset + indexesLength);
+ }
+
+ private File buildIndex(int keyLength) throws IOException {
+ long count = keyCounts[keyLength];
+ int slots = (int) Math.round(count / loadFactor);
+ int offsetLength = maxOffsetLengths[keyLength];
+ int slotSize = keyLength + offsetLength;
+
+ // Init index
+ File indexFile = new File(tempFolder, "index" + keyLength + ".dat");
+ try (RandomAccessFile indexAccessFile = new
RandomAccessFile(indexFile, "rw")) {
+ indexAccessFile.setLength((long) slots * slotSize);
+ FileChannel indexChannel = indexAccessFile.getChannel();
+ MappedByteBuffer byteBuffer =
+ indexChannel.map(FileChannel.MapMode.READ_WRITE, 0,
indexAccessFile.length());
+
+ // Init reading stream
+ File tempIndexFile = indexFiles[keyLength];
+ DataInputStream tempIndexStream =
+ new DataInputStream(
+ new BufferedInputStream(new
FileInputStream(tempIndexFile)));
+ try {
+ byte[] keyBuffer = new byte[keyLength];
+ byte[] slotBuffer = new byte[slotSize];
+ byte[] offsetBuffer = new byte[offsetLength];
+
+ // Read all keys
+ for (int i = 0; i < count; i++) {
+ // Read key
+ tempIndexStream.readFully(keyBuffer);
+
+ // Read offset
+ long offset =
VarLengthIntUtils.decodeLong(tempIndexStream);
+
+ // Hash
+ long hash = MurmurHashUtils.hashBytesPositive(keyBuffer);
+
+ boolean collision = false;
+ for (int probe = 0; probe < count; probe++) {
+ int slot = (int) ((hash + probe) % slots);
+ byteBuffer.position(slot * slotSize);
+ byteBuffer.get(slotBuffer);
+
+ long found = VarLengthIntUtils.decodeLong(slotBuffer,
keyLength);
+ if (found == 0) {
+ // The spot is empty use it
+ byteBuffer.position(slot * slotSize);
+ byteBuffer.put(keyBuffer);
+ int pos =
VarLengthIntUtils.encodeLong(offsetBuffer, offset);
+ byteBuffer.put(offsetBuffer, 0, pos);
+ break;
+ } else {
+ collision = true;
+ // Check for duplicates
+ if (Arrays.equals(keyBuffer,
Arrays.copyOf(slotBuffer, keyLength))) {
+ throw new RuntimeException(
+ String.format(
+ "A duplicate key has been
found for for key bytes %s",
+ Arrays.toString(keyBuffer)));
+ }
+ }
+ }
+
+ if (collision) {
+ collisions++;
+ }
+ }
+
+ String msg =
+ " Max offset length: "
+ + offsetLength
+ + " bytes"
+ + "\n Slot size: "
+ + slotSize
+ + " bytes";
+
+ LOG.info("Built index file {}\n" + msg, indexFile.getName());
+ } finally {
+ // Close input
+ tempIndexStream.close();
+
+ // Close index and make sure resources are liberated
+ indexChannel.close();
+
+ // Delete temp index file
+ if (tempIndexFile.delete()) {
+ LOG.info("Temporary index file {} has been deleted",
tempIndexFile.getName());
+ }
+ }
+ }
+
+ return indexFile;
+ }
+
+ // Fail if the size of the expected store file exceed 2/3rd of the free
disk space
+ private void checkFreeDiskSpace(List<File> inputFiles) {
+ // Check for free space
+ long usableSpace = 0;
+ long totalSize = 0;
+ for (File f : inputFiles) {
+ if (f.exists()) {
+ totalSize += f.length();
+ usableSpace = f.getUsableSpace();
+ }
+ }
+ LOG.info(
+ "Total expected store size is {} Mb",
+ new DecimalFormat("#,##0.0").format(totalSize / (1024 *
1024)));
+ LOG.info(
+ "Usable free space on the system is {} Mb",
+ new DecimalFormat("#,##0.0").format(usableSpace / (1024 *
1024)));
+ if (totalSize / (double) usableSpace >= 0.66) {
+ throw new RuntimeException("Aborting because there isn' enough
free disk space");
+ }
+ }
+
+ // Merge files to the provided fileChannel
+ private void mergeFiles(List<File> inputFiles, OutputStream outputStream)
throws IOException {
+ long startTime = System.nanoTime();
+
+ // Merge files
+ for (File f : inputFiles) {
+ if (f.exists()) {
+ FileInputStream fileInputStream = new FileInputStream(f);
+ BufferedInputStream bufferedInputStream = new
BufferedInputStream(fileInputStream);
+ try {
+ LOG.info("Merging {} size={}", f.getName(), f.length());
+
+ byte[] buffer = new byte[8192];
+ int length;
+ while ((length = bufferedInputStream.read(buffer)) > 0) {
+ outputStream.write(buffer, 0, length);
+ }
+ } finally {
+ bufferedInputStream.close();
+ fileInputStream.close();
+ }
+ } else {
+ LOG.info("Skip merging file {} because it doesn't exist",
f.getName());
+ }
+ }
+
+ LOG.info("Time to merge {} s", ((System.nanoTime() - startTime) /
1000000000.0));
+ }
+
+ // Cleanup files
+ private void cleanup(List<File> inputFiles) {
+ for (File f : inputFiles) {
+ if (f.exists()) {
+ if (f.delete()) {
+ LOG.info("Deleted temporary file {}", f.getName());
+ }
+ }
+ }
+ if (tempFolder.delete()) {
+ LOG.info("Deleted temporary folder at {}",
tempFolder.getAbsolutePath());
+ }
+ }
+
+ // Get the data stream for the specified keyLength, create it if needed
+ private DataOutputStream getDataStream(int keyLength) throws IOException {
+ // Resize array if necessary
+ if (dataStreams.length <= keyLength) {
+ dataStreams = Arrays.copyOf(dataStreams, keyLength + 1);
+ dataFiles = Arrays.copyOf(dataFiles, keyLength + 1);
+ }
+
+ DataOutputStream dos = dataStreams[keyLength];
+ if (dos == null) {
+ File file = new File(tempFolder, "data" + keyLength + ".dat");
+ file.deleteOnExit();
+ dataFiles[keyLength] = file;
+
+ dos = new DataOutputStream(new BufferedOutputStream(new
FileOutputStream(file)));
+ dataStreams[keyLength] = dos;
+
+ // Write one byte so the zero offset is reserved
+ dos.writeByte(0);
+ }
+ return dos;
+ }
+
+ // Get the index stream for the specified keyLength, create it if needed
+ private DataOutputStream getIndexStream(int keyLength) throws IOException {
+ // Resize array if necessary
+ if (indexStreams.length <= keyLength) {
+ indexStreams = Arrays.copyOf(indexStreams, keyLength + 1);
+ indexFiles = Arrays.copyOf(indexFiles, keyLength + 1);
+ keyCounts = Arrays.copyOf(keyCounts, keyLength + 1);
+ maxOffsetLengths = Arrays.copyOf(maxOffsetLengths, keyLength + 1);
+ lastValues = Arrays.copyOf(lastValues, keyLength + 1);
+ lastValuesLength = Arrays.copyOf(lastValuesLength, keyLength + 1);
+ dataLengths = Arrays.copyOf(dataLengths, keyLength + 1);
+ }
+
+ // Get or create stream
+ DataOutputStream dos = indexStreams[keyLength];
+ if (dos == null) {
+ File file = new File(tempFolder, "temp_index" + keyLength +
".dat");
+ file.deleteOnExit();
+ indexFiles[keyLength] = file;
+
+ dos = new DataOutputStream(new BufferedOutputStream(new
FileOutputStream(file)));
+ indexStreams[keyLength] = dos;
+
+ dataLengths[keyLength]++;
+ }
+ return dos;
+ }
+
+ private int getNumKeyCount() {
+ int res = 0;
+ for (int count : keyCounts) {
+ if (count != 0) {
+ res++;
+ }
+ }
+ return res;
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
new file mode 100644
index 00000000..9960495e
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015 LinkedIn Corp. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package org.apache.flink.table.store.utils;
+
+/** A simple read buffer provides {@code readUnsignedByte} and position. */
+public final class SimpleReadBuffer {
+
+ private int pos = 0;
+ private final byte[] buf;
+
+ public SimpleReadBuffer(byte[] data) {
+ buf = data;
+ }
+
+ public byte[] getBuf() {
+ return buf;
+ }
+
+ public int getPos() {
+ return pos;
+ }
+
+ public SimpleReadBuffer reset() {
+ pos = 0;
+ return this;
+ }
+
+ public int readUnsignedByte() {
+ return buf[pos++] & 0xff;
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
new file mode 100644
index 00000000..a6180a80
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2015 LinkedIn Corp. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** Utils for encoding int/long to var length bytes. */
+public final class VarLengthIntUtils {
+
+ /** @return bytes length. */
+ public static int encodeLong(DataOutput os, long value) throws IOException
{
+
+ if (value < 0) {
+ throw new IllegalArgumentException("negative value: v=" + value);
+ }
+
+ int i = 1;
+ while ((value & ~0x7FL) != 0) {
+ os.write((((int) value & 0x7F) | 0x80));
+ value >>>= 7;
+ i++;
+ }
+ os.write((byte) value);
+ return i;
+ }
+
+ /** @return bytes length. */
+ public static int encodeLong(byte[] bytes, long value) {
+
+ if (value < 0) {
+ throw new IllegalArgumentException("negative value: v=" + value);
+ }
+
+ int i = 1;
+ while ((value & ~0x7FL) != 0) {
+ bytes[i - 1] = (byte) (((int) value & 0x7F) | 0x80);
+ value >>>= 7;
+ i++;
+ }
+ bytes[i - 1] = (byte) value;
+ return i;
+ }
+
+ public static long decodeLong(DataInput is) throws IOException {
+
+ long result = 0;
+ for (int offset = 0; offset < 64; offset += 7) {
+ long b = is.readUnsignedByte();
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed long.");
+ }
+
+ public static long decodeLong(byte[] ba, int index) {
+ long result = 0;
+ for (int offset = 0; offset < 64; offset += 7) {
+ long b = ba[index++];
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed long.");
+ }
+
+ /** @return bytes length. */
+ public static int encodeInt(DataOutput os, int value) throws IOException {
+
+ if (value < 0) {
+ throw new IllegalArgumentException("negative value: v=" + value);
+ }
+
+ int i = 1;
+ while ((value & ~0x7F) != 0) {
+ os.write(((value & 0x7F) | 0x80));
+ value >>>= 7;
+ i++;
+ }
+
+ os.write((byte) value);
+ return i;
+ }
+
+ public static int decodeInt(SimpleReadBuffer is) {
+ for (int offset = 0, result = 0; offset < 32; offset += 7) {
+ int b = is.readUnsignedByte();
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed integer.");
+ }
+
+ public static int decodeInt(DataInput is) throws IOException {
+ for (int offset = 0, result = 0; offset < 32; offset += 7) {
+ int b = is.readUnsignedByte();
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed integer.");
+ }
+
+ public static int decodeInt(ByteBuffer bb) {
+ for (int offset = 0, result = 0; offset < 32; offset += 7) {
+ int b = bb.get() & 0xffff;
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed integer.");
+ }
+}
diff --git a/flink-table-store-common/src/main/resources/META-INF/NOTICE
b/flink-table-store-common/src/main/resources/META-INF/NOTICE
index bc13a1ce..ef711cfb 100644
--- a/flink-table-store-common/src/main/resources/META-INF/NOTICE
+++ b/flink-table-store-common/src/main/resources/META-INF/NOTICE
@@ -4,6 +4,10 @@ Copyright 2014-2022 The Apache Software Foundation
This product includes software developed at
The Apache Software Foundation (http://www.apache.org/).
+This project bundles the following dependencies under the Apache Software
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.linkedin.paldb:paldb:1.2.0
+
This project bundles the following dependencies under the BSD 3-clause license.
See bundled license files for details.
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
new file mode 100644
index 00000000..6c76f425
--- /dev/null
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.io.DataOutputSerializer;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HashLookupStoreFactory}. */
+public class HashLookupStoreFactoryTest {
+
+ @TempDir Path tempDir;
+
+ private final RandomDataGenerator random = new RandomDataGenerator();
+
+ private File file;
+ private HashLookupStoreFactory factory;
+
+ @BeforeEach
+ public void setUp() throws IOException {
+ this.factory = new HashLookupStoreFactory(0.75d, true,
MemorySize.ofMebiBytes(1024));
+ this.file = new File(tempDir.toFile(), UUID.randomUUID().toString());
+ if (!file.createNewFile()) {
+ throw new IOException("Can not create file: " + file);
+ }
+ }
+
+ private byte[] toBytes(Object o) {
+ return toBytes(o.toString());
+ }
+
+ private byte[] toBytes(String str) {
+ return str.getBytes(StandardCharsets.UTF_8);
+ }
+
+ @Test
+ public void testEmpty() throws IOException {
+ HashLookupStoreWriter writer = factory.createWriter(file);
+ writer.close();
+
+ assertThat(file.exists()).isTrue();
+
+ HashLookupStoreReader reader = factory.createReader(file);
+
+ assertThat(reader.lookup(toBytes(1))).isNull();
+
+ reader.close();
+ }
+
+ @Test
+ public void testOneKey() throws IOException {
+ HashLookupStoreWriter writer = factory.createWriter(file);
+ writer.put(toBytes(1), toBytes("foo"));
+ writer.close();
+
+ HashLookupStoreReader reader = factory.createReader(file);
+ assertThat(reader.lookup(toBytes(1))).isEqualTo(toBytes("foo"));
+ reader.close();
+ }
+
+ @Test
+ public void testTwoFirstKeyLength() throws IOException {
+ int key1 = 1;
+ int key2 = 245;
+
+ // Write
+ writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6});
+
+ // Read
+ HashLookupStoreReader reader = factory.createReader(file);
+ assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1));
+ assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6));
+ assertThat(reader.lookup(toBytes(0))).isNull();
+ assertThat(reader.lookup(toBytes(6))).isNull();
+ assertThat(reader.lookup(toBytes(244))).isNull();
+ assertThat(reader.lookup(toBytes(246))).isNull();
+ assertThat(reader.lookup(toBytes(1245))).isNull();
+ }
+
+ @Test
+ public void testKeyLengthGap() throws IOException {
+ int key1 = 1;
+ int key2 = 2450;
+
+ // Write
+ writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6});
+
+ // Read
+ HashLookupStoreReader reader = factory.createReader(file);
+ assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1));
+ assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6));
+ assertThat(reader.lookup(toBytes(0))).isNull();
+ assertThat(reader.lookup(toBytes(6))).isNull();
+ assertThat(reader.lookup(toBytes(244))).isNull();
+ assertThat(reader.lookup(toBytes(267))).isNull();
+ assertThat(reader.lookup(toBytes(2449))).isNull();
+ assertThat(reader.lookup(toBytes(2451))).isNull();
+ assertThat(reader.lookup(toBytes(2454441))).isNull();
+ }
+
+ @Test
+ public void testKeyLengthStartTwo() throws IOException {
+ int key1 = 245;
+ int key2 = 2450;
+
+ // Write
+ writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6});
+
+ // Read
+ HashLookupStoreReader reader = factory.createReader(file);
+ assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1));
+ assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6));
+ assertThat(reader.lookup(toBytes(6))).isNull();
+ assertThat(reader.lookup(toBytes(244))).isNull();
+ assertThat(reader.lookup(toBytes(267))).isNull();
+ assertThat(reader.lookup(toBytes(2449))).isNull();
+ assertThat(reader.lookup(toBytes(2451))).isNull();
+ assertThat(reader.lookup(toBytes(2454441))).isNull();
+ }
+
+ @Test
+ public void testDataOnTwoBuffers() throws IOException {
+ Object[] keys = new Object[] {1, 2, 3};
+ Object[] values =
+ new Object[] {
+ generateStringData(100), generateStringData(10000),
generateStringData(100)
+ };
+
+ int byteSize = toBytes(values[0]).length + toBytes(values[1]).length;
+
+ // Write
+ writeStore(file, keys, values);
+
+ // Read
+ factory = new HashLookupStoreFactory(0.75d, true, new
MemorySize(byteSize - 100));
+ HashLookupStoreReader reader = factory.createReader(file);
+ for (int i = 0; i < keys.length; i++) {
+
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+ }
+ }
+
+ @Test
+ public void testDataSizeOnTwoBuffers() throws IOException {
+ Object[] keys = new Object[] {1, 2, 3};
+ Object[] values =
+ new Object[] {
+ generateStringData(100), generateStringData(10000),
generateStringData(100)
+ };
+
+ byte[] b1 = toBytes(values[0]);
+ byte[] b2 = toBytes(values[1]);
+ int byteSize = b1.length + b2.length;
+ int sizeSize =
+ VarLengthIntUtils.encodeInt(new DataOutputSerializer(4),
b1.length)
+ + VarLengthIntUtils.encodeInt(new
DataOutputSerializer(4), b2.length);
+
+ // Write
+ writeStore(file, keys, values);
+
+ // Read
+ factory = new HashLookupStoreFactory(0.75d, true, new
MemorySize(byteSize + sizeSize + 3));
+ HashLookupStoreReader reader = factory.createReader(file);
+ for (int i = 0; i < keys.length; i++) {
+
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+ }
+ }
+
+ @Test
+ public void testReadStringToString() throws IOException {
+ testReadKeyToString(generateStringKeys(100));
+ }
+
+ @Test
+ public void testReadIntToString() throws IOException {
+ testReadKeyToString(generateIntKeys(100));
+ }
+
+ @Test
+ public void testReadDoubleToString() throws IOException {
+ testReadKeyToString(generateDoubleKeys(100));
+ }
+
+ @Test
+ public void testReadLongToString() throws IOException {
+ testReadKeyToString(generateLongKeys(100));
+ }
+
+ @Test
+ public void testReadStringToInt() throws IOException {
+ testReadKeyToInt(generateStringKeys(100));
+ }
+
+ @Test
+ public void testReadByteToInt() throws IOException {
+ testReadKeyToInt(generateByteKeys(100));
+ }
+
+ @Test
+ public void testReadIntToInt() throws IOException {
+ testReadKeyToInt(generateIntKeys(100));
+ }
+
+ @Test
+ public void testReadCompoundToString() throws IOException {
+ testReadKeyToString(generateCompoundKeys(100));
+ }
+
+ @Test
+ public void testReadCompoundByteToString() throws IOException {
+ testReadKeyToString(new Object[] {generateCompoundByteKey()});
+ }
+
+ @Test
+ public void testReadDisk() throws IOException {
+ Integer[] keys = generateIntKeys(10000);
+
+ // Write
+ Object[] values = generateStringData(keys.length, 1000);
+ writeStore(file, keys, values);
+
+ // Read
+ factory = new HashLookupStoreFactory(0.75d, false,
MemorySize.ofMebiBytes(1024));
+ HashLookupStoreReader reader = factory.createReader(file);
+
+ for (int i = 0; i < keys.length; i++) {
+
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+ }
+ reader.close();
+ }
+
+ @Test
+ public void testIterate() throws IOException {
+ Integer[] keys = generateIntKeys(100);
+ String[] values = generateStringData(keys.length, 12);
+
+ // Write
+ writeStore(file, keys, values);
+
+ // Sets
+ Set<Integer> keysSet = new HashSet<>(Arrays.asList(keys));
+ Set<String> valuesSet = new HashSet<>(Arrays.asList(values));
+
+ // Read
+ HashLookupStoreReader reader = factory.createReader(file);
+ Iterator<Map.Entry<byte[], byte[]>> itr = reader.iterator();
+ for (int i = 0; i < keys.length; i++) {
+ assertThat(itr.hasNext()).isTrue();
+ Map.Entry<byte[], byte[]> entry = itr.next();
+ assertThat(entry).isNotNull();
+ assertThat(keysSet.remove(Integer.valueOf(new
String(entry.getKey())))).isTrue();
+ assertThat(valuesSet.remove(new
String(entry.getValue()))).isTrue();
+
+
assertThat(reader.lookup(entry.getKey())).isEqualTo(entry.getValue());
+ }
+ assertThat(itr.hasNext()).isFalse();
+ reader.close();
+
+ assertThat(keysSet).isEmpty();
+ assertThat(valuesSet).isEmpty();
+ }
+
+ // UTILITY
+
+ private void testReadKeyToString(Object[] keys) throws IOException {
+ // Write
+ Object[] values = generateStringData(keys.length, 10);
+ writeStore(file, keys, values);
+
+ // Read
+ HashLookupStoreReader reader = factory.createReader(file);
+
+ for (int i = 0; i < keys.length; i++) {
+
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+ }
+
+ reader.close();
+ }
+
+ private void testReadKeyToInt(Object[] keys) throws IOException {
+ // Write
+ Integer[] values = generateIntData(keys.length);
+ writeStore(file, keys, values);
+
+ // Read
+ HashLookupStoreReader reader = factory.createReader(file);
+
+ for (int i = 0; i < keys.length; i++) {
+
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+ }
+
+ reader.close();
+ }
+
+ private void writeStore(File location, Object[] keys, Object[] values)
throws IOException {
+ HashLookupStoreWriter writer = factory.createWriter(location);
+ for (int i = 0; i < keys.length; i++) {
+ writer.put(toBytes(keys[i]), toBytes(values[i]));
+ }
+ writer.close();
+ }
+
+ private Integer[] generateIntKeys(int count) {
+ Integer[] res = new Integer[count];
+ for (int i = 0; i < count; i++) {
+ res[i] = i;
+ }
+ return res;
+ }
+
+ private String[] generateStringKeys(int count) {
+ String[] res = new String[count];
+ for (int i = 0; i < count; i++) {
+ res[i] = i + "";
+ }
+ return res;
+ }
+
+ private Byte[] generateByteKeys(int count) {
+ if (count > 127) {
+ throw new RuntimeException("Too large range");
+ }
+ Byte[] res = new Byte[count];
+ for (int i = 0; i < count; i++) {
+ res[i] = (byte) i;
+ }
+ return res;
+ }
+
+ private Double[] generateDoubleKeys(int count) {
+ Double[] res = new Double[count];
+ for (int i = 0; i < count; i++) {
+ res[i] = (double) i;
+ }
+ return res;
+ }
+
+ private Long[] generateLongKeys(int count) {
+ Long[] res = new Long[count];
+ for (int i = 0; i < count; i++) {
+ res[i] = (long) i;
+ }
+ return res;
+ }
+
+ private Object[] generateCompoundKeys(int count) {
+ Object[] res = new Object[count];
+ Random random = new Random(345);
+ for (int i = 0; i < count; i++) {
+ Object[] k = new Object[] {(byte) random.nextInt(10), i};
+ res[i] = k;
+ }
+ return res;
+ }
+
+ private Object[] generateCompoundByteKey() {
+ Object[] res = new Object[2];
+ res[0] = (byte) 6;
+ res[1] = (byte) 0;
+ return res;
+ }
+
+ private String generateStringData(int letters) {
+ return random.nextHexString(letters);
+ }
+
+ private String[] generateStringData(int count, int letters) {
+ String[] res = new String[count];
+ for (int i = 0; i < count; i++) {
+ res[i] = random.nextHexString(letters);
+ }
+ return res;
+ }
+
+ private Integer[] generateIntData(int count) {
+ Integer[] res = new Integer[count];
+ Random random = new Random(count + 34593263544354353L);
+ for (int i = 0; i < count; i++) {
+ res[i] = random.nextInt(1000000);
+ }
+ return res;
+ }
+}