This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 0f5744a9 [FLINK-31397] Introduce write-once hash lookup store
0f5744a9 is described below

commit 0f5744a92bc52f52bc0f6644dc063dcb60fe326c
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Mar 14 11:15:55 2023 +0800

    [FLINK-31397] Introduce write-once hash lookup store
    
    This closes #593
---
 .../table/store/lookup/LookupStoreFactory.java     |  38 ++
 .../table/store/lookup/LookupStoreReader.java      |  29 ++
 .../table/store/lookup/LookupStoreWriter.java      |  29 ++
 .../store/lookup/hash/HashLookupStoreFactory.java  |  49 +++
 .../store/lookup/hash/HashLookupStoreReader.java   | 455 +++++++++++++++++++
 .../store/lookup/hash/HashLookupStoreWriter.java   | 486 +++++++++++++++++++++
 .../flink/table/store/utils/SimpleReadBuffer.java  |  43 ++
 .../flink/table/store/utils/VarLengthIntUtils.java | 134 ++++++
 .../src/main/resources/META-INF/NOTICE             |   4 +
 .../lookup/hash/HashLookupStoreFactoryTest.java    | 421 ++++++++++++++++++
 10 files changed, 1688 insertions(+)

diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreFactory.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreFactory.java
new file mode 100644
index 00000000..88d2b55d
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup;
+
+import java.io.File;
+import java.io.IOException;
+
+/**
+ * A key-value store for lookup, key-value store should be single binary file 
written once and ready
+ * to be used. This factory provide two interfaces:
+ *
+ * <ul>
+ *   <li>Writer: written once to prepare binary file.
+ *   <li>Reader: lookup value by key bytes.
+ * </ul>
+ */
+public interface LookupStoreFactory {
+
+    LookupStoreWriter createWriter(File file) throws IOException;
+
+    LookupStoreReader createReader(File file) throws IOException;
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreReader.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreReader.java
new file mode 100644
index 00000000..2ceaf0e9
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreReader.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** Reader, lookup value by key bytes. */
+public interface LookupStoreReader extends Closeable {
+
+    /** Lookup value by key. */
+    byte[] lookup(byte[] key) throws IOException;
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreWriter.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreWriter.java
new file mode 100644
index 00000000..03fe86a5
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/LookupStoreWriter.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/** Writer to prepare binary file. */
+public interface LookupStoreWriter extends Closeable {
+
+    /** Put key value to store. */
+    void put(byte[] key, byte[] value) throws IOException;
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
new file mode 100644
index 00000000..895ab08e
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreFactory;
+import org.apache.flink.table.store.options.MemorySize;
+
+import java.io.File;
+import java.io.IOException;
+
+/** A {@link LookupStoreFactory} which uses hash to lookup records on disk. */
+public class HashLookupStoreFactory implements LookupStoreFactory {
+
+    private final double loadFactor;
+    private final boolean useMmp;
+    private final MemorySize mmpSegmentSize;
+
+    public HashLookupStoreFactory(double loadFactor, boolean useMmp, 
MemorySize mmpSegmentSize) {
+        this.loadFactor = loadFactor;
+        this.useMmp = useMmp;
+        this.mmpSegmentSize = mmpSegmentSize;
+    }
+
+    @Override
+    public HashLookupStoreWriter createWriter(File file) throws IOException {
+        return new HashLookupStoreWriter(loadFactor, file);
+    }
+
+    @Override
+    public HashLookupStoreReader createReader(File file) throws IOException {
+        return new HashLookupStoreReader(useMmp, mmpSegmentSize.getBytes(), 
file);
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
new file mode 100644
index 00000000..390d599b
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
@@ -0,0 +1,455 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreReader;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.SimpleReadBuffer;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.text.SimpleDateFormat;
+import java.util.Arrays;
+import java.util.Calendar;
+import java.util.Iterator;
+import java.util.Map;
+
+/** Internal read implementation for hash kv store. */
+public class HashLookupStoreReader
+        implements LookupStoreReader, Iterable<Map.Entry<byte[], byte[]>> {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(HashLookupStoreReader.class.getName());
+
+    // Buffer segment size
+    private final long mmpSegmentSize;
+    // Key count for each key length
+    private final int[] keyCounts;
+    // Slot size for each key length
+    private final int[] slotSizes;
+    // Number of slots for each key length
+    private final int[] slots;
+    // Offset of the index for different key length
+    private final int[] indexOffsets;
+    // Offset of the data in the channel
+    private final long dataOffset;
+    // Offset of the data for different key length
+    private final long[] dataOffsets;
+    // Data size
+    private final long dataSize;
+    // Index and data buffers
+    private MappedByteBuffer indexBuffer;
+    private MappedByteBuffer[] dataBuffers;
+    // FileChannel
+    private RandomAccessFile mappedFile;
+    private FileChannel channel;
+    // Use MMap for data?
+    private final boolean mMapData;
+    // Buffers
+    private final SimpleReadBuffer sizeBuffer = new SimpleReadBuffer(new 
byte[5]);
+    private final byte[] slotBuffer;
+
+    HashLookupStoreReader(boolean useMmp, long mmpSegmentSize, File file) 
throws IOException {
+        // File path
+        if (!file.exists()) {
+            throw new FileNotFoundException("File " + file.getAbsolutePath() + 
" not found");
+        }
+        LOG.info("Opening file {}", file.getName());
+
+        this.mmpSegmentSize = mmpSegmentSize;
+        // Check valid segmentSize
+        if (this.mmpSegmentSize > Integer.MAX_VALUE) {
+            throw new IllegalArgumentException("The mmpSegmentSize can't be 
larger than 2GB");
+        }
+
+        // Open file and read metadata
+        long createdAt;
+        FileInputStream inputStream = new FileInputStream(file);
+        DataInputStream dataInputStream = new DataInputStream(new 
BufferedInputStream(inputStream));
+        // Offset of the index in the channel
+        int indexOffset;
+        int keyCount;
+        try {
+            // Time
+            createdAt = dataInputStream.readLong();
+
+            // Metadata counters
+            keyCount = dataInputStream.readInt();
+            // Number of different key length
+            int keyLengthCount = dataInputStream.readInt();
+            // Max key length
+            int maxKeyLength = dataInputStream.readInt();
+
+            // Read offset counts and keys
+            indexOffsets = new int[maxKeyLength + 1];
+            dataOffsets = new long[maxKeyLength + 1];
+            keyCounts = new int[maxKeyLength + 1];
+            slots = new int[maxKeyLength + 1];
+            slotSizes = new int[maxKeyLength + 1];
+
+            int maxSlotSize = 0;
+            for (int i = 0; i < keyLengthCount; i++) {
+                int keyLength = dataInputStream.readInt();
+
+                keyCounts[keyLength] = dataInputStream.readInt();
+                slots[keyLength] = dataInputStream.readInt();
+                slotSizes[keyLength] = dataInputStream.readInt();
+                indexOffsets[keyLength] = dataInputStream.readInt();
+                dataOffsets[keyLength] = dataInputStream.readLong();
+
+                maxSlotSize = Math.max(maxSlotSize, slotSizes[keyLength]);
+            }
+
+            slotBuffer = new byte[maxSlotSize];
+
+            // Read index and data offset
+            indexOffset = dataInputStream.readInt();
+            dataOffset = dataInputStream.readLong();
+        } finally {
+            // Close metadata
+            dataInputStream.close();
+            inputStream.close();
+        }
+
+        // Create Mapped file in read-only mode
+        mappedFile = new RandomAccessFile(file, "r");
+        channel = mappedFile.getChannel();
+        long fileSize = file.length();
+
+        // Create index buffer
+        indexBuffer =
+                channel.map(FileChannel.MapMode.READ_ONLY, indexOffset, 
dataOffset - indexOffset);
+
+        // Create data buffers
+        dataSize = fileSize - dataOffset;
+
+        // Check if data size fits in memory map limit
+        if (!useMmp) {
+            // Use classical disk read
+            mMapData = false;
+            dataBuffers = null;
+        } else {
+            // Use Mmap
+            mMapData = true;
+
+            // Build data buffers
+            int bufArraySize =
+                    (int) (dataSize / this.mmpSegmentSize)
+                            + ((dataSize % this.mmpSegmentSize != 0) ? 1 : 0);
+            dataBuffers = new MappedByteBuffer[bufArraySize];
+            int bufIdx = 0;
+            for (long offset = 0; offset < dataSize; offset += 
this.mmpSegmentSize) {
+                long remainingFileSize = dataSize - offset;
+                long thisSegmentSize = Math.min(this.mmpSegmentSize, 
remainingFileSize);
+                dataBuffers[bufIdx++] =
+                        channel.map(
+                                FileChannel.MapMode.READ_ONLY,
+                                dataOffset + offset,
+                                thisSegmentSize);
+            }
+        }
+
+        // logging
+        DecimalFormat integerFormat = new DecimalFormat("#,##0.00");
+        StringBuilder statMsg = new StringBuilder("Storage metadata\n");
+        statMsg.append("  Created at: 
").append(formatCreatedAt(createdAt)).append("\n");
+        statMsg.append("  Key count: ").append(keyCount).append("\n");
+        for (int i = 0; i < keyCounts.length; i++) {
+            if (keyCounts[i] > 0) {
+                statMsg.append("  Key count for key length ")
+                        .append(i)
+                        .append(": ")
+                        .append(keyCounts[i])
+                        .append("\n");
+            }
+        }
+        statMsg.append("  Index size: ")
+                .append(integerFormat.format((dataOffset - indexOffset) / 
(1024.0 * 1024.0)))
+                .append(" Mb\n");
+        statMsg.append("  Data size: ")
+                .append(integerFormat.format((fileSize - dataOffset) / (1024.0 
* 1024.0)))
+                .append(" Mb\n");
+        if (mMapData) {
+            statMsg.append("  Number of memory mapped data buffers: 
").append(dataBuffers.length);
+        } else {
+            statMsg.append("  Memory mapped data disabled, using disk");
+        }
+        LOG.info(statMsg.toString());
+    }
+
+    @Override
+    public byte[] lookup(byte[] key) throws IOException {
+        int keyLength = key.length;
+        if (keyLength >= slots.length || keyCounts[keyLength] == 0) {
+            return null;
+        }
+        long hash = MurmurHashUtils.hashBytesPositive(key);
+        int numSlots = slots[keyLength];
+        int slotSize = slotSizes[keyLength];
+        int indexOffset = indexOffsets[keyLength];
+        long dataOffset = dataOffsets[keyLength];
+
+        for (int probe = 0; probe < numSlots; probe++) {
+            int slot = (int) ((hash + probe) % numSlots);
+            indexBuffer.position(indexOffset + slot * slotSize);
+            indexBuffer.get(slotBuffer, 0, slotSize);
+
+            long offset = VarLengthIntUtils.decodeLong(slotBuffer, keyLength);
+            if (offset == 0) {
+                return null;
+            }
+            if (isKey(slotBuffer, key)) {
+                return mMapData
+                        ? getMMapBytes(dataOffset + offset)
+                        : getDiskBytes(dataOffset + offset);
+            }
+        }
+        return null;
+    }
+
+    private boolean isKey(byte[] slotBuffer, byte[] key) {
+        for (int i = 0; i < key.length; i++) {
+            if (slotBuffer[i] != key[i]) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    // Read the data at the given offset, the data can be spread over multiple 
data buffers
+    private byte[] getMMapBytes(long offset) {
+        // Read the first 4 bytes to get the size of the data
+        ByteBuffer buf = getDataBuffer(offset);
+        int maxLen = (int) Math.min(5, dataSize - offset);
+
+        int size;
+        if (buf.remaining() >= maxLen) {
+            // Continuous read
+            int pos = buf.position();
+            size = VarLengthIntUtils.decodeInt(buf);
+
+            // Used in case of data is spread over multiple buffers
+            offset += buf.position() - pos;
+        } else {
+            // The size of the data is spread over multiple buffers
+            int len = maxLen;
+            int off = 0;
+            sizeBuffer.reset();
+            while (len > 0) {
+                buf = getDataBuffer(offset + off);
+                int count = Math.min(len, buf.remaining());
+                buf.get(sizeBuffer.getBuf(), off, count);
+                off += count;
+                len -= count;
+            }
+            size = VarLengthIntUtils.decodeInt(sizeBuffer);
+            offset += sizeBuffer.getPos();
+            buf = getDataBuffer(offset);
+        }
+
+        // Create output bytes
+        byte[] res = new byte[size];
+
+        // Check if the data is one buffer
+        if (buf.remaining() >= size) {
+            // Continuous read
+            buf.get(res, 0, size);
+        } else {
+            int len = size;
+            int off = 0;
+            while (len > 0) {
+                buf = getDataBuffer(offset);
+                int count = Math.min(len, buf.remaining());
+                buf.get(res, off, count);
+                offset += count;
+                off += count;
+                len -= count;
+            }
+        }
+
+        return res;
+    }
+
+    // Get data from disk
+    private byte[] getDiskBytes(long offset) throws IOException {
+        mappedFile.seek(dataOffset + offset);
+
+        // Get size of data
+        int size = VarLengthIntUtils.decodeInt(mappedFile);
+
+        // Create output bytes
+        byte[] res = new byte[size];
+
+        // Read data
+        if (mappedFile.read(res) == -1) {
+            throw new EOFException();
+        }
+
+        return res;
+    }
+
+    // Return the data buffer for the given position
+    private ByteBuffer getDataBuffer(long index) {
+        ByteBuffer buf = dataBuffers[(int) (index / mmpSegmentSize)];
+        buf.position((int) (index % mmpSegmentSize));
+        return buf;
+    }
+
+    private String formatCreatedAt(long createdAt) {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd G 'at' 
HH:mm:ss z");
+        Calendar cl = Calendar.getInstance();
+        cl.setTimeInMillis(createdAt);
+        return sdf.format(cl.getTime());
+    }
+
+    @Override
+    public void close() throws IOException {
+        channel.close();
+        mappedFile.close();
+        indexBuffer = null;
+        dataBuffers = null;
+        mappedFile = null;
+        channel = null;
+    }
+
+    @Override
+    public Iterator<Map.Entry<byte[], byte[]>> iterator() {
+        return new StorageIterator(true);
+    }
+
+    public Iterator<Map.Entry<byte[], byte[]>> keys() {
+        return new StorageIterator(false);
+    }
+
+    private class StorageIterator implements Iterator<Map.Entry<byte[], 
byte[]>> {
+
+        private final FastEntry entry = new FastEntry();
+        private final boolean withValue;
+        private int currentKeyLength = 0;
+        private byte[] currentSlotBuffer;
+        private long keyIndex;
+        private long keyLimit;
+        private long currentDataOffset;
+        private int currentIndexOffset;
+
+        public StorageIterator(boolean value) {
+            withValue = value;
+            nextKeyLength();
+        }
+
+        private void nextKeyLength() {
+            for (int i = currentKeyLength + 1; i < keyCounts.length; i++) {
+                long c = keyCounts[i];
+                if (c > 0) {
+                    currentKeyLength = i;
+                    keyLimit += c;
+                    currentSlotBuffer = new byte[slotSizes[i]];
+                    currentIndexOffset = indexOffsets[i];
+                    currentDataOffset = dataOffsets[i];
+                    break;
+                }
+            }
+        }
+
+        @Override
+        public boolean hasNext() {
+            return keyIndex < keyLimit;
+        }
+
+        @Override
+        public FastEntry next() {
+            try {
+                indexBuffer.position(currentIndexOffset);
+
+                long offset = 0;
+                while (offset == 0) {
+                    indexBuffer.get(currentSlotBuffer);
+                    offset = VarLengthIntUtils.decodeLong(currentSlotBuffer, 
currentKeyLength);
+                    currentIndexOffset += currentSlotBuffer.length;
+                }
+
+                byte[] key = Arrays.copyOf(currentSlotBuffer, 
currentKeyLength);
+                byte[] value = null;
+
+                if (withValue) {
+                    long valueOffset = currentDataOffset + offset;
+                    value = mMapData ? getMMapBytes(valueOffset) : 
getDiskBytes(valueOffset);
+                }
+
+                entry.set(key, value);
+
+                if (++keyIndex == keyLimit) {
+                    nextKeyLength();
+                }
+                return entry;
+            } catch (IOException ex) {
+                throw new RuntimeException(ex);
+            }
+        }
+
+        @Override
+        public void remove() {
+            throw new UnsupportedOperationException("Not supported yet.");
+        }
+
+        private class FastEntry implements Map.Entry<byte[], byte[]> {
+
+            private byte[] key;
+            private byte[] val;
+
+            protected void set(byte[] k, byte[] v) {
+                this.key = k;
+                this.val = v;
+            }
+
+            @Override
+            public byte[] getKey() {
+                return key;
+            }
+
+            @Override
+            public byte[] getValue() {
+                return val;
+            }
+
+            @Override
+            public byte[] setValue(byte[] value) {
+                throw new UnsupportedOperationException("Not supported.");
+            }
+        }
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java
new file mode 100644
index 00000000..2e0be801
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreWriter.java
@@ -0,0 +1,486 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.utils.MurmurHashUtils;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.RandomAccessFile;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.UUID;
+
+/** Internal write implementation for hash kv store. */
+public class HashLookupStoreWriter implements LookupStoreWriter {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(HashLookupStoreWriter.class.getName());
+
+    // load factor of hash map, default 0.75
+    private final double loadFactor;
+    // Output
+    private final File tempFolder;
+    private final OutputStream outputStream;
+    // Index stream
+    private File[] indexFiles;
+    private DataOutputStream[] indexStreams;
+    // Data stream
+    private File[] dataFiles;
+    private DataOutputStream[] dataStreams;
+    // Cache last value
+    private byte[][] lastValues;
+    private int[] lastValuesLength;
+    // Data length
+    private long[] dataLengths;
+    // Index length
+    private long indexesLength;
+    // Max offset length
+    private int[] maxOffsetLengths;
+    // Number of keys
+    private int keyCount;
+    private int[] keyCounts;
+    // Number of values
+    private int valueCount;
+    // Number of collisions
+    private int collisions;
+
+    HashLookupStoreWriter(double loadFactor, File file) throws IOException {
+        this.loadFactor = loadFactor;
+        if (loadFactor <= 0.0 || loadFactor >= 1.0) {
+            throw new IllegalArgumentException(
+                    "Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
+        }
+
+        tempFolder = new File(file.getParentFile(), 
UUID.randomUUID().toString());
+        if (!tempFolder.mkdir()) {
+            throw new IOException("Can not create temp folder: " + tempFolder);
+        }
+        outputStream = new BufferedOutputStream(new FileOutputStream(file));
+        indexStreams = new DataOutputStream[0];
+        dataStreams = new DataOutputStream[0];
+        indexFiles = new File[0];
+        dataFiles = new File[0];
+        lastValues = new byte[0][];
+        lastValuesLength = new int[0];
+        dataLengths = new long[0];
+        maxOffsetLengths = new int[0];
+        keyCounts = new int[0];
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) throws IOException {
+        int keyLength = key.length;
+
+        // Get the Output stream for that keyLength, each key length has its 
own file
+        DataOutputStream indexStream = getIndexStream(keyLength);
+
+        // Write key
+        indexStream.write(key);
+
+        // Check if the value is identical to the last inserted
+        byte[] lastValue = lastValues[keyLength];
+        boolean sameValue = lastValue != null && Arrays.equals(value, 
lastValue);
+
+        // Get data stream and length
+        long dataLength = dataLengths[keyLength];
+        if (sameValue) {
+            dataLength -= lastValuesLength[keyLength];
+        }
+
+        // Write offset and record max offset length
+        int offsetLength = VarLengthIntUtils.encodeLong(indexStream, 
dataLength);
+        maxOffsetLengths[keyLength] = Math.max(offsetLength, 
maxOffsetLengths[keyLength]);
+
+        // Write if data is not the same
+        if (!sameValue) {
+            // Get stream
+            DataOutputStream dataStream = getDataStream(keyLength);
+
+            // Write size and value
+            int valueSize = VarLengthIntUtils.encodeInt(dataStream, 
value.length);
+            dataStream.write(value);
+
+            // Update data length
+            dataLengths[keyLength] += valueSize + value.length;
+
+            // Update last value
+            lastValues[keyLength] = value;
+            lastValuesLength[keyLength] = valueSize + value.length;
+
+            valueCount++;
+        }
+
+        keyCount++;
+        keyCounts[keyLength]++;
+    }
+
+    @Override
+    public void close() throws IOException {
+        // Close the data and index streams
+        for (DataOutputStream dos : dataStreams) {
+            if (dos != null) {
+                dos.close();
+            }
+        }
+        for (DataOutputStream dos : indexStreams) {
+            if (dos != null) {
+                dos.close();
+            }
+        }
+
+        // Stats
+        LOG.info("Number of keys: {}", keyCount);
+        LOG.info("Number of values: {}", valueCount);
+
+        // Prepare files to merge
+        List<File> filesToMerge = new ArrayList<>();
+
+        try {
+
+            // Write metadata file
+            File metadataFile = new File(tempFolder, "metadata.dat");
+            metadataFile.deleteOnExit();
+            FileOutputStream metadataOututStream = new 
FileOutputStream(metadataFile);
+            DataOutputStream metadataDataOutputStream = new 
DataOutputStream(metadataOututStream);
+            writeMetadata(metadataDataOutputStream);
+            metadataDataOutputStream.close();
+            metadataOututStream.close();
+            filesToMerge.add(metadataFile);
+
+            // Build index file
+            for (int i = 0; i < indexFiles.length; i++) {
+                if (indexFiles[i] != null) {
+                    filesToMerge.add(buildIndex(i));
+                }
+            }
+
+            // Stats collisions
+            LOG.info("Number of collisions: {}", collisions);
+
+            // Add data files
+            for (File dataFile : dataFiles) {
+                if (dataFile != null) {
+                    filesToMerge.add(dataFile);
+                }
+            }
+
+            // Merge and write to output
+            checkFreeDiskSpace(filesToMerge);
+            mergeFiles(filesToMerge, outputStream);
+        } finally {
+            outputStream.close();
+            cleanup(filesToMerge);
+        }
+    }
+
+    private void writeMetadata(DataOutputStream dataOutputStream) throws 
IOException {
+        // Write time
+        dataOutputStream.writeLong(System.currentTimeMillis());
+
+        // Prepare
+        int keyLengthCount = getNumKeyCount();
+        int maxKeyLength = keyCounts.length - 1;
+
+        // Write size (number of keys)
+        dataOutputStream.writeInt(keyCount);
+
+        // Write the number of different key length
+        dataOutputStream.writeInt(keyLengthCount);
+
+        // Write the max value for keyLength
+        dataOutputStream.writeInt(maxKeyLength);
+
+        // For each keyLength
+        long datasLength = 0L;
+        for (int i = 0; i < keyCounts.length; i++) {
+            if (keyCounts[i] > 0) {
+                // Write the key length
+                dataOutputStream.writeInt(i);
+
+                // Write key count
+                dataOutputStream.writeInt(keyCounts[i]);
+
+                // Write slot count
+                int slots = (int) Math.round(keyCounts[i] / loadFactor);
+                dataOutputStream.writeInt(slots);
+
+                // Write slot size
+                int offsetLength = maxOffsetLengths[i];
+                dataOutputStream.writeInt(i + offsetLength);
+
+                // Write index offset
+                dataOutputStream.writeInt((int) indexesLength);
+
+                // Increment index length
+                indexesLength += (long) (i + offsetLength) * slots;
+
+                // Write data length
+                dataOutputStream.writeLong(datasLength);
+
+                // Increment data length
+                datasLength += dataLengths[i];
+            }
+        }
+
+        // Write the position of the index and the data
+        int indexOffset =
+                dataOutputStream.size() + (Integer.SIZE / Byte.SIZE) + 
(Long.SIZE / Byte.SIZE);
+        dataOutputStream.writeInt(indexOffset);
+        dataOutputStream.writeLong(indexOffset + indexesLength);
+    }
+
+    private File buildIndex(int keyLength) throws IOException {
+        long count = keyCounts[keyLength];
+        int slots = (int) Math.round(count / loadFactor);
+        int offsetLength = maxOffsetLengths[keyLength];
+        int slotSize = keyLength + offsetLength;
+
+        // Init index
+        File indexFile = new File(tempFolder, "index" + keyLength + ".dat");
+        try (RandomAccessFile indexAccessFile = new 
RandomAccessFile(indexFile, "rw")) {
+            indexAccessFile.setLength((long) slots * slotSize);
+            FileChannel indexChannel = indexAccessFile.getChannel();
+            MappedByteBuffer byteBuffer =
+                    indexChannel.map(FileChannel.MapMode.READ_WRITE, 0, 
indexAccessFile.length());
+
+            // Init reading stream
+            File tempIndexFile = indexFiles[keyLength];
+            DataInputStream tempIndexStream =
+                    new DataInputStream(
+                            new BufferedInputStream(new 
FileInputStream(tempIndexFile)));
+            try {
+                byte[] keyBuffer = new byte[keyLength];
+                byte[] slotBuffer = new byte[slotSize];
+                byte[] offsetBuffer = new byte[offsetLength];
+
+                // Read all keys
+                for (int i = 0; i < count; i++) {
+                    // Read key
+                    tempIndexStream.readFully(keyBuffer);
+
+                    // Read offset
+                    long offset = 
VarLengthIntUtils.decodeLong(tempIndexStream);
+
+                    // Hash
+                    long hash = MurmurHashUtils.hashBytesPositive(keyBuffer);
+
+                    boolean collision = false;
+                    for (int probe = 0; probe < count; probe++) {
+                        int slot = (int) ((hash + probe) % slots);
+                        byteBuffer.position(slot * slotSize);
+                        byteBuffer.get(slotBuffer);
+
+                        long found = VarLengthIntUtils.decodeLong(slotBuffer, 
keyLength);
+                        if (found == 0) {
+                            // The spot is empty use it
+                            byteBuffer.position(slot * slotSize);
+                            byteBuffer.put(keyBuffer);
+                            int pos = 
VarLengthIntUtils.encodeLong(offsetBuffer, offset);
+                            byteBuffer.put(offsetBuffer, 0, pos);
+                            break;
+                        } else {
+                            collision = true;
+                            // Check for duplicates
+                            if (Arrays.equals(keyBuffer, 
Arrays.copyOf(slotBuffer, keyLength))) {
+                                throw new RuntimeException(
+                                        String.format(
+                                                "A duplicate key has been 
found for for key bytes %s",
+                                                Arrays.toString(keyBuffer)));
+                            }
+                        }
+                    }
+
+                    if (collision) {
+                        collisions++;
+                    }
+                }
+
+                String msg =
+                        "  Max offset length: "
+                                + offsetLength
+                                + " bytes"
+                                + "\n  Slot size: "
+                                + slotSize
+                                + " bytes";
+
+                LOG.info("Built index file {}\n" + msg, indexFile.getName());
+            } finally {
+                // Close input
+                tempIndexStream.close();
+
+                // Close index and make sure resources are liberated
+                indexChannel.close();
+
+                // Delete temp index file
+                if (tempIndexFile.delete()) {
+                    LOG.info("Temporary index file {} has been deleted", 
tempIndexFile.getName());
+                }
+            }
+        }
+
+        return indexFile;
+    }
+
+    // Fail if the size of the expected store file exceed 2/3rd of the free 
disk space
+    private void checkFreeDiskSpace(List<File> inputFiles) {
+        // Check for free space
+        long usableSpace = 0;
+        long totalSize = 0;
+        for (File f : inputFiles) {
+            if (f.exists()) {
+                totalSize += f.length();
+                usableSpace = f.getUsableSpace();
+            }
+        }
+        LOG.info(
+                "Total expected store size is {} Mb",
+                new DecimalFormat("#,##0.0").format(totalSize / (1024 * 
1024)));
+        LOG.info(
+                "Usable free space on the system is {} Mb",
+                new DecimalFormat("#,##0.0").format(usableSpace / (1024 * 
1024)));
+        if (totalSize / (double) usableSpace >= 0.66) {
+            throw new RuntimeException("Aborting because there isn' enough 
free disk space");
+        }
+    }
+
+    // Merge files to the provided fileChannel
+    private void mergeFiles(List<File> inputFiles, OutputStream outputStream) 
throws IOException {
+        long startTime = System.nanoTime();
+
+        // Merge files
+        for (File f : inputFiles) {
+            if (f.exists()) {
+                FileInputStream fileInputStream = new FileInputStream(f);
+                BufferedInputStream bufferedInputStream = new 
BufferedInputStream(fileInputStream);
+                try {
+                    LOG.info("Merging {} size={}", f.getName(), f.length());
+
+                    byte[] buffer = new byte[8192];
+                    int length;
+                    while ((length = bufferedInputStream.read(buffer)) > 0) {
+                        outputStream.write(buffer, 0, length);
+                    }
+                } finally {
+                    bufferedInputStream.close();
+                    fileInputStream.close();
+                }
+            } else {
+                LOG.info("Skip merging file {} because it doesn't exist", 
f.getName());
+            }
+        }
+
+        LOG.info("Time to merge {} s", ((System.nanoTime() - startTime) / 
1000000000.0));
+    }
+
+    // Cleanup files
+    private void cleanup(List<File> inputFiles) {
+        for (File f : inputFiles) {
+            if (f.exists()) {
+                if (f.delete()) {
+                    LOG.info("Deleted temporary file {}", f.getName());
+                }
+            }
+        }
+        if (tempFolder.delete()) {
+            LOG.info("Deleted temporary folder at {}", 
tempFolder.getAbsolutePath());
+        }
+    }
+
+    // Get the data stream for the specified keyLength, create it if needed
+    private DataOutputStream getDataStream(int keyLength) throws IOException {
+        // Resize array if necessary
+        if (dataStreams.length <= keyLength) {
+            dataStreams = Arrays.copyOf(dataStreams, keyLength + 1);
+            dataFiles = Arrays.copyOf(dataFiles, keyLength + 1);
+        }
+
+        DataOutputStream dos = dataStreams[keyLength];
+        if (dos == null) {
+            File file = new File(tempFolder, "data" + keyLength + ".dat");
+            file.deleteOnExit();
+            dataFiles[keyLength] = file;
+
+            dos = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(file)));
+            dataStreams[keyLength] = dos;
+
+            // Write one byte so the zero offset is reserved
+            dos.writeByte(0);
+        }
+        return dos;
+    }
+
+    // Get the index stream for the specified keyLength, create it if needed
+    private DataOutputStream getIndexStream(int keyLength) throws IOException {
+        // Resize array if necessary
+        if (indexStreams.length <= keyLength) {
+            indexStreams = Arrays.copyOf(indexStreams, keyLength + 1);
+            indexFiles = Arrays.copyOf(indexFiles, keyLength + 1);
+            keyCounts = Arrays.copyOf(keyCounts, keyLength + 1);
+            maxOffsetLengths = Arrays.copyOf(maxOffsetLengths, keyLength + 1);
+            lastValues = Arrays.copyOf(lastValues, keyLength + 1);
+            lastValuesLength = Arrays.copyOf(lastValuesLength, keyLength + 1);
+            dataLengths = Arrays.copyOf(dataLengths, keyLength + 1);
+        }
+
+        // Get or create stream
+        DataOutputStream dos = indexStreams[keyLength];
+        if (dos == null) {
+            File file = new File(tempFolder, "temp_index" + keyLength + 
".dat");
+            file.deleteOnExit();
+            indexFiles[keyLength] = file;
+
+            dos = new DataOutputStream(new BufferedOutputStream(new 
FileOutputStream(file)));
+            indexStreams[keyLength] = dos;
+
+            dataLengths[keyLength]++;
+        }
+        return dos;
+    }
+
+    private int getNumKeyCount() {
+        int res = 0;
+        for (int count : keyCounts) {
+            if (count != 0) {
+                res++;
+            }
+        }
+        return res;
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
new file mode 100644
index 00000000..9960495e
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
@@ -0,0 +1,43 @@
+/*
+ * Copyright 2015 LinkedIn Corp. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package org.apache.flink.table.store.utils;
+
+/** A simple read buffer provides {@code readUnsignedByte} and position. */
+public final class SimpleReadBuffer {
+
+    private int pos = 0;
+    private final byte[] buf;
+
+    public SimpleReadBuffer(byte[] data) {
+        buf = data;
+    }
+
+    public byte[] getBuf() {
+        return buf;
+    }
+
+    public int getPos() {
+        return pos;
+    }
+
+    public SimpleReadBuffer reset() {
+        pos = 0;
+        return this;
+    }
+
+    public int readUnsignedByte() {
+        return buf[pos++] & 0xff;
+    }
+}
diff --git 
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
new file mode 100644
index 00000000..a6180a80
--- /dev/null
+++ 
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
@@ -0,0 +1,134 @@
+/*
+ * Copyright 2015 LinkedIn Corp. All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ */
+
+package org.apache.flink.table.store.utils;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/** Utils for encoding int/long to var length bytes. */
+public final class VarLengthIntUtils {
+
+    /** @return bytes length. */
+    public static int encodeLong(DataOutput os, long value) throws IOException 
{
+
+        if (value < 0) {
+            throw new IllegalArgumentException("negative value: v=" + value);
+        }
+
+        int i = 1;
+        while ((value & ~0x7FL) != 0) {
+            os.write((((int) value & 0x7F) | 0x80));
+            value >>>= 7;
+            i++;
+        }
+        os.write((byte) value);
+        return i;
+    }
+
+    /** @return bytes length. */
+    public static int encodeLong(byte[] bytes, long value) {
+
+        if (value < 0) {
+            throw new IllegalArgumentException("negative value: v=" + value);
+        }
+
+        int i = 1;
+        while ((value & ~0x7FL) != 0) {
+            bytes[i - 1] = (byte) (((int) value & 0x7F) | 0x80);
+            value >>>= 7;
+            i++;
+        }
+        bytes[i - 1] = (byte) value;
+        return i;
+    }
+
+    public static long decodeLong(DataInput is) throws IOException {
+
+        long result = 0;
+        for (int offset = 0; offset < 64; offset += 7) {
+            long b = is.readUnsignedByte();
+            result |= (b & 0x7F) << offset;
+            if ((b & 0x80) == 0) {
+                return result;
+            }
+        }
+        throw new Error("Malformed long.");
+    }
+
+    public static long decodeLong(byte[] ba, int index) {
+        long result = 0;
+        for (int offset = 0; offset < 64; offset += 7) {
+            long b = ba[index++];
+            result |= (b & 0x7F) << offset;
+            if ((b & 0x80) == 0) {
+                return result;
+            }
+        }
+        throw new Error("Malformed long.");
+    }
+
+    /** @return bytes length. */
+    public static int encodeInt(DataOutput os, int value) throws IOException {
+
+        if (value < 0) {
+            throw new IllegalArgumentException("negative value: v=" + value);
+        }
+
+        int i = 1;
+        while ((value & ~0x7F) != 0) {
+            os.write(((value & 0x7F) | 0x80));
+            value >>>= 7;
+            i++;
+        }
+
+        os.write((byte) value);
+        return i;
+    }
+
+    public static int decodeInt(SimpleReadBuffer is) {
+        for (int offset = 0, result = 0; offset < 32; offset += 7) {
+            int b = is.readUnsignedByte();
+            result |= (b & 0x7F) << offset;
+            if ((b & 0x80) == 0) {
+                return result;
+            }
+        }
+        throw new Error("Malformed integer.");
+    }
+
+    public static int decodeInt(DataInput is) throws IOException {
+        for (int offset = 0, result = 0; offset < 32; offset += 7) {
+            int b = is.readUnsignedByte();
+            result |= (b & 0x7F) << offset;
+            if ((b & 0x80) == 0) {
+                return result;
+            }
+        }
+        throw new Error("Malformed integer.");
+    }
+
+    public static int decodeInt(ByteBuffer bb) {
+        for (int offset = 0, result = 0; offset < 32; offset += 7) {
+            int b = bb.get() & 0xffff;
+            result |= (b & 0x7F) << offset;
+            if ((b & 0x80) == 0) {
+                return result;
+            }
+        }
+        throw new Error("Malformed integer.");
+    }
+}
diff --git a/flink-table-store-common/src/main/resources/META-INF/NOTICE 
b/flink-table-store-common/src/main/resources/META-INF/NOTICE
index bc13a1ce..ef711cfb 100644
--- a/flink-table-store-common/src/main/resources/META-INF/NOTICE
+++ b/flink-table-store-common/src/main/resources/META-INF/NOTICE
@@ -4,6 +4,10 @@ Copyright 2014-2022 The Apache Software Foundation
 This product includes software developed at
 The Apache Software Foundation (http://www.apache.org/).
 
+This project bundles the following dependencies under the Apache Software 
License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- com.linkedin.paldb:paldb:1.2.0
+
 This project bundles the following dependencies under the BSD 3-clause license.
 See bundled license files for details.
 
diff --git 
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
new file mode 100644
index 00000000..6c76f425
--- /dev/null
+++ 
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
@@ -0,0 +1,421 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+/* This file is based on source code from the PalDB Project 
(https://github.com/linkedin/PalDB), licensed by the Apache
+ * Software Foundation (ASF) under the Apache License, Version 2.0. See the 
NOTICE file distributed with this work for
+ * additional information regarding copyright ownership. */
+
+package org.apache.flink.table.store.lookup.hash;
+
+import org.apache.flink.table.store.io.DataOutputSerializer;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.utils.VarLengthIntUtils;
+
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link HashLookupStoreFactory}. */
+public class HashLookupStoreFactoryTest {
+
+    @TempDir Path tempDir;
+
+    private final RandomDataGenerator random = new RandomDataGenerator();
+
+    private File file;
+    private HashLookupStoreFactory factory;
+
+    @BeforeEach
+    public void setUp() throws IOException {
+        this.factory = new HashLookupStoreFactory(0.75d, true, 
MemorySize.ofMebiBytes(1024));
+        this.file = new File(tempDir.toFile(), UUID.randomUUID().toString());
+        if (!file.createNewFile()) {
+            throw new IOException("Can not create file: " + file);
+        }
+    }
+
+    private byte[] toBytes(Object o) {
+        return toBytes(o.toString());
+    }
+
+    private byte[] toBytes(String str) {
+        return str.getBytes(StandardCharsets.UTF_8);
+    }
+
+    @Test
+    public void testEmpty() throws IOException {
+        HashLookupStoreWriter writer = factory.createWriter(file);
+        writer.close();
+
+        assertThat(file.exists()).isTrue();
+
+        HashLookupStoreReader reader = factory.createReader(file);
+
+        assertThat(reader.lookup(toBytes(1))).isNull();
+
+        reader.close();
+    }
+
+    @Test
+    public void testOneKey() throws IOException {
+        HashLookupStoreWriter writer = factory.createWriter(file);
+        writer.put(toBytes(1), toBytes("foo"));
+        writer.close();
+
+        HashLookupStoreReader reader = factory.createReader(file);
+        assertThat(reader.lookup(toBytes(1))).isEqualTo(toBytes("foo"));
+        reader.close();
+    }
+
+    @Test
+    public void testTwoFirstKeyLength() throws IOException {
+        int key1 = 1;
+        int key2 = 245;
+
+        // Write
+        writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6});
+
+        // Read
+        HashLookupStoreReader reader = factory.createReader(file);
+        assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1));
+        assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6));
+        assertThat(reader.lookup(toBytes(0))).isNull();
+        assertThat(reader.lookup(toBytes(6))).isNull();
+        assertThat(reader.lookup(toBytes(244))).isNull();
+        assertThat(reader.lookup(toBytes(246))).isNull();
+        assertThat(reader.lookup(toBytes(1245))).isNull();
+    }
+
+    @Test
+    public void testKeyLengthGap() throws IOException {
+        int key1 = 1;
+        int key2 = 2450;
+
+        // Write
+        writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6});
+
+        // Read
+        HashLookupStoreReader reader = factory.createReader(file);
+        assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1));
+        assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6));
+        assertThat(reader.lookup(toBytes(0))).isNull();
+        assertThat(reader.lookup(toBytes(6))).isNull();
+        assertThat(reader.lookup(toBytes(244))).isNull();
+        assertThat(reader.lookup(toBytes(267))).isNull();
+        assertThat(reader.lookup(toBytes(2449))).isNull();
+        assertThat(reader.lookup(toBytes(2451))).isNull();
+        assertThat(reader.lookup(toBytes(2454441))).isNull();
+    }
+
+    @Test
+    public void testKeyLengthStartTwo() throws IOException {
+        int key1 = 245;
+        int key2 = 2450;
+
+        // Write
+        writeStore(file, new Object[] {key1, key2}, new Object[] {1, 6});
+
+        // Read
+        HashLookupStoreReader reader = factory.createReader(file);
+        assertThat(reader.lookup(toBytes(key1))).isEqualTo(toBytes(1));
+        assertThat(reader.lookup((toBytes(key2)))).isEqualTo(toBytes(6));
+        assertThat(reader.lookup(toBytes(6))).isNull();
+        assertThat(reader.lookup(toBytes(244))).isNull();
+        assertThat(reader.lookup(toBytes(267))).isNull();
+        assertThat(reader.lookup(toBytes(2449))).isNull();
+        assertThat(reader.lookup(toBytes(2451))).isNull();
+        assertThat(reader.lookup(toBytes(2454441))).isNull();
+    }
+
+    @Test
+    public void testDataOnTwoBuffers() throws IOException {
+        Object[] keys = new Object[] {1, 2, 3};
+        Object[] values =
+                new Object[] {
+                    generateStringData(100), generateStringData(10000), 
generateStringData(100)
+                };
+
+        int byteSize = toBytes(values[0]).length + toBytes(values[1]).length;
+
+        // Write
+        writeStore(file, keys, values);
+
+        // Read
+        factory = new HashLookupStoreFactory(0.75d, true, new 
MemorySize(byteSize - 100));
+        HashLookupStoreReader reader = factory.createReader(file);
+        for (int i = 0; i < keys.length; i++) {
+            
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+        }
+    }
+
+    @Test
+    public void testDataSizeOnTwoBuffers() throws IOException {
+        Object[] keys = new Object[] {1, 2, 3};
+        Object[] values =
+                new Object[] {
+                    generateStringData(100), generateStringData(10000), 
generateStringData(100)
+                };
+
+        byte[] b1 = toBytes(values[0]);
+        byte[] b2 = toBytes(values[1]);
+        int byteSize = b1.length + b2.length;
+        int sizeSize =
+                VarLengthIntUtils.encodeInt(new DataOutputSerializer(4), 
b1.length)
+                        + VarLengthIntUtils.encodeInt(new 
DataOutputSerializer(4), b2.length);
+
+        // Write
+        writeStore(file, keys, values);
+
+        // Read
+        factory = new HashLookupStoreFactory(0.75d, true, new 
MemorySize(byteSize + sizeSize + 3));
+        HashLookupStoreReader reader = factory.createReader(file);
+        for (int i = 0; i < keys.length; i++) {
+            
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+        }
+    }
+
+    @Test
+    public void testReadStringToString() throws IOException {
+        testReadKeyToString(generateStringKeys(100));
+    }
+
+    @Test
+    public void testReadIntToString() throws IOException {
+        testReadKeyToString(generateIntKeys(100));
+    }
+
+    @Test
+    public void testReadDoubleToString() throws IOException {
+        testReadKeyToString(generateDoubleKeys(100));
+    }
+
+    @Test
+    public void testReadLongToString() throws IOException {
+        testReadKeyToString(generateLongKeys(100));
+    }
+
+    @Test
+    public void testReadStringToInt() throws IOException {
+        testReadKeyToInt(generateStringKeys(100));
+    }
+
+    @Test
+    public void testReadByteToInt() throws IOException {
+        testReadKeyToInt(generateByteKeys(100));
+    }
+
+    @Test
+    public void testReadIntToInt() throws IOException {
+        testReadKeyToInt(generateIntKeys(100));
+    }
+
+    @Test
+    public void testReadCompoundToString() throws IOException {
+        testReadKeyToString(generateCompoundKeys(100));
+    }
+
+    @Test
+    public void testReadCompoundByteToString() throws IOException {
+        testReadKeyToString(new Object[] {generateCompoundByteKey()});
+    }
+
+    @Test
+    public void testReadDisk() throws IOException {
+        Integer[] keys = generateIntKeys(10000);
+
+        // Write
+        Object[] values = generateStringData(keys.length, 1000);
+        writeStore(file, keys, values);
+
+        // Read
+        factory = new HashLookupStoreFactory(0.75d, false, 
MemorySize.ofMebiBytes(1024));
+        HashLookupStoreReader reader = factory.createReader(file);
+
+        for (int i = 0; i < keys.length; i++) {
+            
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+        }
+        reader.close();
+    }
+
+    @Test
+    public void testIterate() throws IOException {
+        Integer[] keys = generateIntKeys(100);
+        String[] values = generateStringData(keys.length, 12);
+
+        // Write
+        writeStore(file, keys, values);
+
+        // Sets
+        Set<Integer> keysSet = new HashSet<>(Arrays.asList(keys));
+        Set<String> valuesSet = new HashSet<>(Arrays.asList(values));
+
+        // Read
+        HashLookupStoreReader reader = factory.createReader(file);
+        Iterator<Map.Entry<byte[], byte[]>> itr = reader.iterator();
+        for (int i = 0; i < keys.length; i++) {
+            assertThat(itr.hasNext()).isTrue();
+            Map.Entry<byte[], byte[]> entry = itr.next();
+            assertThat(entry).isNotNull();
+            assertThat(keysSet.remove(Integer.valueOf(new 
String(entry.getKey())))).isTrue();
+            assertThat(valuesSet.remove(new 
String(entry.getValue()))).isTrue();
+
+            
assertThat(reader.lookup(entry.getKey())).isEqualTo(entry.getValue());
+        }
+        assertThat(itr.hasNext()).isFalse();
+        reader.close();
+
+        assertThat(keysSet).isEmpty();
+        assertThat(valuesSet).isEmpty();
+    }
+
+    // UTILITY
+
+    private void testReadKeyToString(Object[] keys) throws IOException {
+        // Write
+        Object[] values = generateStringData(keys.length, 10);
+        writeStore(file, keys, values);
+
+        // Read
+        HashLookupStoreReader reader = factory.createReader(file);
+
+        for (int i = 0; i < keys.length; i++) {
+            
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+        }
+
+        reader.close();
+    }
+
+    private void testReadKeyToInt(Object[] keys) throws IOException {
+        // Write
+        Integer[] values = generateIntData(keys.length);
+        writeStore(file, keys, values);
+
+        // Read
+        HashLookupStoreReader reader = factory.createReader(file);
+
+        for (int i = 0; i < keys.length; i++) {
+            
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
+        }
+
+        reader.close();
+    }
+
+    private void writeStore(File location, Object[] keys, Object[] values) 
throws IOException {
+        HashLookupStoreWriter writer = factory.createWriter(location);
+        for (int i = 0; i < keys.length; i++) {
+            writer.put(toBytes(keys[i]), toBytes(values[i]));
+        }
+        writer.close();
+    }
+
+    private Integer[] generateIntKeys(int count) {
+        Integer[] res = new Integer[count];
+        for (int i = 0; i < count; i++) {
+            res[i] = i;
+        }
+        return res;
+    }
+
+    private String[] generateStringKeys(int count) {
+        String[] res = new String[count];
+        for (int i = 0; i < count; i++) {
+            res[i] = i + "";
+        }
+        return res;
+    }
+
+    private Byte[] generateByteKeys(int count) {
+        if (count > 127) {
+            throw new RuntimeException("Too large range");
+        }
+        Byte[] res = new Byte[count];
+        for (int i = 0; i < count; i++) {
+            res[i] = (byte) i;
+        }
+        return res;
+    }
+
+    private Double[] generateDoubleKeys(int count) {
+        Double[] res = new Double[count];
+        for (int i = 0; i < count; i++) {
+            res[i] = (double) i;
+        }
+        return res;
+    }
+
+    private Long[] generateLongKeys(int count) {
+        Long[] res = new Long[count];
+        for (int i = 0; i < count; i++) {
+            res[i] = (long) i;
+        }
+        return res;
+    }
+
+    private Object[] generateCompoundKeys(int count) {
+        Object[] res = new Object[count];
+        Random random = new Random(345);
+        for (int i = 0; i < count; i++) {
+            Object[] k = new Object[] {(byte) random.nextInt(10), i};
+            res[i] = k;
+        }
+        return res;
+    }
+
+    private Object[] generateCompoundByteKey() {
+        Object[] res = new Object[2];
+        res[0] = (byte) 6;
+        res[1] = (byte) 0;
+        return res;
+    }
+
+    private String generateStringData(int letters) {
+        return random.nextHexString(letters);
+    }
+
+    private String[] generateStringData(int count, int letters) {
+        String[] res = new String[count];
+        for (int i = 0; i < count; i++) {
+            res[i] = random.nextHexString(letters);
+        }
+        return res;
+    }
+
+    private Integer[] generateIntData(int count) {
+        Integer[] res = new Integer[count];
+        Random random = new Random(count + 34593263544354353L);
+        for (int i = 0; i < count; i++) {
+            res[i] = random.nextInt(1000000);
+        }
+        return res;
+    }
+}

Reply via email to