This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/master by this push:
new 5cd9746b [core] Introduce lookup changelog producer
5cd9746b is described below
commit 5cd9746b1df2674ee4ca8ab00f07e3dff24848d6
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Mar 15 10:23:31 2023 +0800
[core] Introduce lookup changelog producer
This closes #590
---
docs/content/docs/concepts/primary-key-table.md | 48 +++-
.../shortcodes/generated/core_configuration.html | 26 +-
.../generated/flink_connector_configuration.html | 6 +
docs/static/img/changelog-producer-lookup.png | Bin 0 -> 79473 bytes
.../table/store/data/RandomAccessInputView.java | 4 +-
.../table/store/io/SeekableDataInputView.java | 25 +-
.../flink/table/store/io/cache/CacheManager.java | 137 +++++++++++
.../store/io/cache/CachedRandomInputView.java | 108 ++++++++
.../store/lookup/hash/HashLookupStoreFactory.java | 12 +-
.../store/lookup/hash/HashLookupStoreReader.java | 190 +++-----------
.../flink/table/store/options/MemorySize.java | 4 +
.../apache/flink/table/store/utils/IOFunction.java | 21 +-
.../apache/flink/table/store/utils/MathUtils.java | 11 +
.../flink/table/store/utils/SimpleReadBuffer.java | 43 ----
.../flink/table/store/utils/VarLengthIntUtils.java | 23 --
.../store/io/cache/CachedRandomInputViewTest.java | 106 ++++++++
.../lookup/hash/HashLookupStoreFactoryTest.java | 37 ++-
.../org/apache/flink/table/store/CoreOptions.java | 34 ++-
.../file/append/AppendOnlyCompactManager.java | 4 +
.../table/store/file/compact/CompactManager.java | 3 +-
.../store/file/compact/NoopCompactManager.java | 4 +
.../table/store/file/mergetree/DataFileReader.java | 68 +++++
.../flink/table/store/file/mergetree/Levels.java | 23 ++
.../table/store/file/mergetree/LookupLevels.java | 268 ++++++++++++++++++++
.../store/file/mergetree/MergeTreeWriter.java | 1 +
.../mergetree/compact/AbstractCompactRewriter.java | 4 +
.../file/mergetree/compact/CompactRewriter.java | 3 +-
.../FullChangelogMergeTreeCompactRewriter.java | 4 +
.../file/mergetree/compact/LookupCompaction.java | 59 +++++
...er.java => LookupMergeTreeCompactRewriter.java} | 55 +++--
.../mergetree/compact/MergeTreeCompactManager.java | 6 +
.../mergetree/compact/UniversalCompaction.java | 1 +
.../file/operation/KeyValueFileStoreWrite.java | 81 ++++--
.../store/file/operation/MemoryFileStoreWrite.java | 15 +-
.../table/store/file/schema/SchemaValidation.java | 19 +-
.../table/ChangelogWithKeyFileStoreTable.java | 10 +-
.../store/table/source/StreamDataTableScan.java | 19 +-
.../table/source/StreamDataTableScanImpl.java | 32 ++-
.../store/file/mergetree/LookupLevelsTest.java | 274 +++++++++++++++++++++
.../store/connector/FlinkConnectorOptions.java | 11 +
.../table/store/connector/sink/FlinkSink.java | 49 ++--
.../sink/LookupChangelogStoreSinkWrite.java | 50 ++++
.../table/store/connector/CatalogITCaseBase.java | 5 +
.../ChangelogWithKeyFileStoreTableITCase.java | 239 ++++++++++++------
.../connector/LookupChangelogWithAggITCase.java | 57 +++++
.../table/store/connector/LookupJoinITCase.java | 2 +-
46 files changed, 1760 insertions(+), 441 deletions(-)
diff --git a/docs/content/docs/concepts/primary-key-table.md
b/docs/content/docs/concepts/primary-key-table.md
index 84d20247..7cfa1aa0 100644
--- a/docs/content/docs/concepts/primary-key-table.md
+++ b/docs/content/docs/concepts/primary-key-table.md
@@ -60,7 +60,7 @@ For example, let's say Table Store receives three records:
If the first column is the primary key. The final result will be `<1, 25.2,
10, 'This is a book'>`.
{{< hint info >}}
-For streaming queries, `partial-update` merge engine must be used together
with `full-compaction` [changelog producer]({{< ref
"docs/concepts/primary-key-table#changelog-producers" >}}).
+For streaming queries, `partial-update` merge engine must be used together
with `lookup` or `full-compaction` [changelog producer]({{< ref
"docs/concepts/primary-key-table#changelog-producers" >}}).
{{< /hint >}}
{{< hint info >}}
@@ -109,7 +109,7 @@ If you allow some functions to ignore retraction messages,
you can configure:
`'fields.${field_name}.ignore-retract'='true'`.
{{< hint info >}}
-For streaming queries, `aggregation` merge engine must be used together with
`full-compaction` [changelog producer]({{< ref
"docs/concepts/primary-key-table#changelog-producers" >}}).
+For streaming queries, `aggregation` merge engine must be used together with
`lookup` or `full-compaction` [changelog producer]({{< ref
"docs/concepts/primary-key-table#changelog-producers" >}}).
{{< /hint >}}
## Changelog Producers
@@ -144,9 +144,51 @@ By specifying `'changelog-producer' = 'input'`, Table
Store writers rely on thei
{{< img src="/img/changelog-producer-input.png">}}
+### Lookup
+
+If your input can’t produce a complete changelog but you still want to get rid
of the costly normalized operator, you may consider using the `'lookup'`
changelog producer.
+
+By specifying `'changelog-producer' = 'lookup'`, Table Store will generate
changelog through `'lookup'` before committing the data writing.
+
+{{< img src="/img/changelog-producer-lookup.png">}}
+
+Lookup will cache data on the memory and local disk, you can use the following
options to tune performance:
+
+<table class="table table-bordered">
+ <thead>
+ <tr>
+ <th class="text-left" style="width: 20%">Option</th>
+ <th class="text-left" style="width: 5%">Default</th>
+ <th class="text-left" style="width: 10%">Type</th>
+ <th class="text-left" style="width: 60%">Description</th>
+ </tr>
+ </thead>
+ <tbody>
+ <tr>
+ <td><h5>lookup.cache-file-retention</h5></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>The cached files retention time for lookup. After the file
expires, if there is a need for access, it will be re-read from the DFS to
build an index on the local disk.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.cache-max-disk-size</h5></td>
+ <td style="word-wrap: break-word;">unlimited</td>
+ <td>MemorySize</td>
+ <td>Max disk size for lookup cache, you can use this option to limit
the use of local disks.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.cache-max-memory-size</h5></td>
+ <td style="word-wrap: break-word;">256 mb</td>
+ <td>MemorySize</td>
+ <td>Max memory size for lookup cache.</td>
+ </tr>
+ </tbody>
+</table>
+
### Full Compaction
-If your input can’t produce a complete changelog but you still want to get rid
of the costly normalized operator, you may consider using the full compaction
changelog producer.
+If you think the resource consumption of 'lookup' is too large, you can
consider using 'full-compaction' changelog producer,
+which can decouple data writing and changelog generation, and is more suitable
for scenarios with high latency (For example, 10 minutes).
By specifying `'changelog-producer' = 'full-compaction'`, Table Store will
compare the results between full compactions and produce the differences as
changelog. The latency of changelog is affected by the frequency of full
compactions.
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index fc58a74e..d93339dc 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -30,7 +30,7 @@
<td><h5>changelog-producer</h5></td>
<td style="word-wrap: break-word;">none</td>
<td><p>Enum</p></td>
- <td>Whether to double write to a changelog file. This changelog
file keeps the details of data changes, it can be read directly during stream
reads.<br /><br />Possible values:<ul><li>"none": No changelog
file.</li><li>"input": Double write to a changelog file when flushing memory
table, the changelog is from input.</li><li>"full-compaction": Generate
changelog files with each full compaction.</li></ul></td>
+ <td>Whether to double write to a changelog file. This changelog
file keeps the details of data changes, it can be read directly during stream
reads.<br /><br />Possible values:<ul><li>"none": No changelog
file.</li><li>"input": Double write to a changelog file when flushing memory
table, the changelog is from input.</li><li>"full-compaction": Generate
changelog files with each full compaction.</li><li>"lookup": Generate changelog
files through 'lookup' before committing the d [...]
</tr>
<tr>
<td><h5>commit.force-compact</h5></td>
@@ -122,6 +122,30 @@
<td>Boolean</td>
<td>Whether to force the removal of the normalize node when
streaming read. Note: This is dangerous and is likely to cause data errors if
downstream is used to calculate aggregation and the input is not complete
changelog.</td>
</tr>
+ <tr>
+ <td><h5>lookup.cache-file-retention</h5></td>
+ <td style="word-wrap: break-word;">1 h</td>
+ <td>Duration</td>
+ <td>The cached files retention time for lookup. After the file
expires, if there is a need for access, it will be re-read from the DFS to
build an index on the local disk.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.cache-max-disk-size</h5></td>
+ <td style="word-wrap: break-word;">9223372036854775807 bytes</td>
+ <td>MemorySize</td>
+ <td>Max disk size for lookup cache, you can use this option to
limit the use of local disks.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.cache-max-memory-size</h5></td>
+ <td style="word-wrap: break-word;">256 mb</td>
+ <td>MemorySize</td>
+ <td>Max memory size for lookup cache.</td>
+ </tr>
+ <tr>
+ <td><h5>lookup.hash-load-factor</h5></td>
+ <td style="word-wrap: break-word;">0.75</td>
+ <td>Float</td>
+ <td>The index load factor for lookup.</td>
+ </tr>
<tr>
<td><h5>manifest.format</h5></td>
<td style="word-wrap: break-word;">"avro"</td>
diff --git
a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
index aa51d674..092452c9 100644
--- a/docs/layouts/shortcodes/generated/flink_connector_configuration.html
+++ b/docs/layouts/shortcodes/generated/flink_connector_configuration.html
@@ -14,6 +14,12 @@
<td>Duration</td>
<td>When changelog-producer is set to FULL_COMPACTION, full
compaction will be constantly triggered after this interval.</td>
</tr>
+ <tr>
+ <td><h5>changelog-producer.lookup-wait</h5></td>
+ <td style="word-wrap: break-word;">true</td>
+ <td>Boolean</td>
+ <td>When changelog-producer is set to LOOKUP, commit will wait for
changelog generation by lookup.</td>
+ </tr>
<tr>
<td><h5>log.system</h5></td>
<td style="word-wrap: break-word;">"none"</td>
diff --git a/docs/static/img/changelog-producer-lookup.png
b/docs/static/img/changelog-producer-lookup.png
new file mode 100644
index 00000000..0540863e
Binary files /dev/null and b/docs/static/img/changelog-producer-lookup.png
differ
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/RandomAccessInputView.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/RandomAccessInputView.java
index da69a9a8..18de2307 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/RandomAccessInputView.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/data/RandomAccessInputView.java
@@ -18,6 +18,7 @@
package org.apache.flink.table.store.data;
+import org.apache.flink.table.store.io.SeekableDataInputView;
import org.apache.flink.table.store.memory.MemorySegment;
import org.apache.flink.table.store.utils.MathUtils;
@@ -25,7 +26,7 @@ import java.io.EOFException;
import java.util.ArrayList;
/** A {@link AbstractPagedInputView} to read pages in memory. */
-public class RandomAccessInputView extends AbstractPagedInputView {
+public class RandomAccessInputView extends AbstractPagedInputView implements
SeekableDataInputView {
private final ArrayList<MemorySegment> segments;
@@ -54,6 +55,7 @@ public class RandomAccessInputView extends
AbstractPagedInputView {
this.limitInLastSegment = limitInLastSegment;
}
+ @Override
public void setReadPosition(long position) {
final int bufferNum = (int) (position >>> this.segmentSizeBits);
final int offset = (int) (position & this.segmentSizeMask);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/SeekableDataInputView.java
similarity index 59%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
copy to
flink-table-store-common/src/main/java/org/apache/flink/table/store/io/SeekableDataInputView.java
index d2b62bd1..0da271de 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/SeekableDataInputView.java
@@ -16,19 +16,18 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.mergetree.compact;
+package org.apache.flink.table.store.io;
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.SortedRun;
-
-import java.util.List;
-
-/** Rewrite sections to the files. */
-public interface CompactRewriter {
-
- CompactResult rewrite(int outputLevel, boolean dropDelete,
List<List<SortedRun>> sections)
- throws Exception;
+/**
+ * Interface marking a {@link DataInputView} as seekable. Seekable views can
set the position where
+ * they read from.
+ */
+public interface SeekableDataInputView extends DataInputView {
- CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
+ /**
+ * Sets the read pointer to the given position.
+ *
+ * @param position The new read position.
+ */
+ void setReadPosition(long position);
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CacheManager.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CacheManager.java
new file mode 100644
index 00000000..7daeae83
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CacheManager.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.io.cache;
+
+import org.apache.flink.table.store.annotation.VisibleForTesting;
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.options.MemorySize;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.Objects;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Consumer;
+
+/** Cache manager to cache bytes to paged {@link MemorySegment}s. */
+public class CacheManager {
+
+ private final int pageSize;
+ private final Cache<CacheKey, CacheValue> cache;
+
+ public CacheManager(int pageSize, MemorySize maxMemorySize) {
+ this.pageSize = pageSize;
+ this.cache =
+ CacheBuilder.newBuilder()
+ .weigher(this::weigh)
+ .maximumWeight(maxMemorySize.getBytes())
+ .removalListener(this::onRemoval)
+ .build();
+ }
+
+ @VisibleForTesting
+ Cache<CacheKey, CacheValue> cache() {
+ return cache;
+ }
+
+ public int pageSize() {
+ return pageSize;
+ }
+
+ public MemorySegment getPage(
+ RandomAccessFile file, int pageNumber, Consumer<Integer>
cleanCallback) {
+ CacheKey key = new CacheKey(file, pageNumber);
+ CacheValue value;
+ try {
+ value = cache.get(key, () -> createValue(key, cleanCallback));
+ } catch (ExecutionException e) {
+ throw new RuntimeException(e);
+ }
+ return value.segment;
+ }
+
+ public void invalidPage(RandomAccessFile file, int pageNumber) {
+ cache.invalidate(new CacheKey(file, pageNumber));
+ }
+
+ private int weigh(CacheKey cacheKey, CacheValue cacheValue) {
+ return cacheValue.segment.size();
+ }
+
+ private void onRemoval(RemovalNotification<CacheKey, CacheValue>
notification) {
+
notification.getValue().cleanCallback.accept(notification.getKey().pageNumber);
+ }
+
+ private CacheValue createValue(CacheKey key, Consumer<Integer>
cleanCallback)
+ throws IOException {
+ return new CacheValue(key.read(pageSize), cleanCallback);
+ }
+
+ private static class CacheKey {
+
+ private final RandomAccessFile file;
+ private final int pageNumber;
+
+ private CacheKey(RandomAccessFile file, int pageNumber) {
+ this.file = file;
+ this.pageNumber = pageNumber;
+ }
+
+ private MemorySegment read(int pageSize) throws IOException {
+ long length = file.length();
+ long pageAddress = (long) pageNumber * pageSize;
+ int len = (int) Math.min(pageSize, length - pageAddress);
+ byte[] bytes = new byte[len];
+ file.seek(pageAddress);
+ file.readFully(bytes);
+ return MemorySegment.wrap(bytes);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ CacheKey cacheKey = (CacheKey) o;
+ return pageNumber == cacheKey.pageNumber && Objects.equals(file,
cacheKey.file);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(file, pageNumber);
+ }
+ }
+
+ private static class CacheValue {
+
+ private final MemorySegment segment;
+ private final Consumer<Integer> cleanCallback;
+
+ private CacheValue(MemorySegment segment, Consumer<Integer>
cleanCallback) {
+ this.segment = segment;
+ this.cleanCallback = cleanCallback;
+ }
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CachedRandomInputView.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CachedRandomInputView.java
new file mode 100644
index 00000000..8d8e5f2f
--- /dev/null
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/io/cache/CachedRandomInputView.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.io.cache;
+
+import org.apache.flink.table.store.data.AbstractPagedInputView;
+import org.apache.flink.table.store.io.SeekableDataInputView;
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.utils.MathUtils;
+
+import java.io.Closeable;
+import java.io.EOFException;
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link SeekableDataInputView} to read bytes from {@link
RandomAccessFile}, the bytes can be
+ * cached to {@link MemorySegment}s in {@link CacheManager}.
+ */
+public class CachedRandomInputView extends AbstractPagedInputView
+ implements SeekableDataInputView, Closeable {
+
+ private final RandomAccessFile file;
+ private final long fileLength;
+ private final CacheManager cacheManager;
+ private final Map<Integer, MemorySegment> segments;
+ private final int segmentSizeBits;
+ private final int segmentSizeMask;
+
+ private int currentSegmentIndex;
+
+ public CachedRandomInputView(File file, CacheManager cacheManager)
+ throws FileNotFoundException {
+ this.file = new RandomAccessFile(file, "r");
+ this.fileLength = file.length();
+ this.cacheManager = cacheManager;
+ this.segments = new HashMap<>();
+ int segmentSize = cacheManager.pageSize();
+ this.segmentSizeBits = MathUtils.log2strict(segmentSize);
+ this.segmentSizeMask = segmentSize - 1;
+
+ this.currentSegmentIndex = -1;
+ }
+
+ @Override
+ public void setReadPosition(long position) {
+ final int pageNumber = (int) (position >>> this.segmentSizeBits);
+ final int offset = (int) (position & this.segmentSizeMask);
+ this.currentSegmentIndex = pageNumber;
+ MemorySegment segment = getCurrentPage();
+ seekInput(segment, offset, getLimitForSegment(segment));
+ }
+
+ private MemorySegment getCurrentPage() {
+ return segments.computeIfAbsent(
+ currentSegmentIndex,
+ key -> cacheManager.getPage(file, currentSegmentIndex,
this::invalidPage));
+ }
+
+ @Override
+ protected MemorySegment nextSegment(MemorySegment current) throws
EOFException {
+ currentSegmentIndex++;
+ if ((long) currentSegmentIndex << segmentSizeBits >= fileLength) {
+ throw new EOFException();
+ }
+
+ return getCurrentPage();
+ }
+
+ @Override
+ protected int getLimitForSegment(MemorySegment segment) {
+ return segment.size();
+ }
+
+ private void invalidPage(int pageNumber) {
+ segments.remove(pageNumber);
+ }
+
+ @Override
+ public void close() throws IOException {
+ // copy out to avoid ConcurrentModificationException
+ List<Integer> pages = new ArrayList<>(segments.keySet());
+ pages.forEach(page -> cacheManager.invalidPage(file, page));
+
+ file.close();
+ }
+}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
index 895ab08e..d827775a 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactory.java
@@ -18,8 +18,8 @@
package org.apache.flink.table.store.lookup.hash;
+import org.apache.flink.table.store.io.cache.CacheManager;
import org.apache.flink.table.store.lookup.LookupStoreFactory;
-import org.apache.flink.table.store.options.MemorySize;
import java.io.File;
import java.io.IOException;
@@ -27,14 +27,12 @@ import java.io.IOException;
/** A {@link LookupStoreFactory} which uses hash to lookup records on disk. */
public class HashLookupStoreFactory implements LookupStoreFactory {
+ private final CacheManager cacheManager;
private final double loadFactor;
- private final boolean useMmp;
- private final MemorySize mmpSegmentSize;
- public HashLookupStoreFactory(double loadFactor, boolean useMmp,
MemorySize mmpSegmentSize) {
+ public HashLookupStoreFactory(CacheManager cacheManager, double
loadFactor) {
+ this.cacheManager = cacheManager;
this.loadFactor = loadFactor;
- this.useMmp = useMmp;
- this.mmpSegmentSize = mmpSegmentSize;
}
@Override
@@ -44,6 +42,6 @@ public class HashLookupStoreFactory implements
LookupStoreFactory {
@Override
public HashLookupStoreReader createReader(File file) throws IOException {
- return new HashLookupStoreReader(useMmp, mmpSegmentSize.getBytes(),
file);
+ return new HashLookupStoreReader(cacheManager, file);
}
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
index 390d599b..f04908d7 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreReader.java
@@ -22,9 +22,10 @@
package org.apache.flink.table.store.lookup.hash;
+import org.apache.flink.table.store.io.cache.CacheManager;
+import org.apache.flink.table.store.io.cache.CachedRandomInputView;
import org.apache.flink.table.store.lookup.LookupStoreReader;
import org.apache.flink.table.store.utils.MurmurHashUtils;
-import org.apache.flink.table.store.utils.SimpleReadBuffer;
import org.apache.flink.table.store.utils.VarLengthIntUtils;
import org.slf4j.Logger;
@@ -32,15 +33,10 @@ import org.slf4j.LoggerFactory;
import java.io.BufferedInputStream;
import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.File;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.nio.ByteBuffer;
-import java.nio.MappedByteBuffer;
-import java.nio.channels.FileChannel;
import java.text.DecimalFormat;
import java.text.SimpleDateFormat;
import java.util.Arrays;
@@ -55,8 +51,6 @@ public class HashLookupStoreReader
private static final Logger LOG =
LoggerFactory.getLogger(HashLookupStoreReader.class.getName());
- // Buffer segment size
- private final long mmpSegmentSize;
// Key count for each key length
private final int[] keyCounts;
// Slot size for each key length
@@ -65,44 +59,28 @@ public class HashLookupStoreReader
private final int[] slots;
// Offset of the index for different key length
private final int[] indexOffsets;
- // Offset of the data in the channel
- private final long dataOffset;
// Offset of the data for different key length
private final long[] dataOffsets;
- // Data size
- private final long dataSize;
- // Index and data buffers
- private MappedByteBuffer indexBuffer;
- private MappedByteBuffer[] dataBuffers;
- // FileChannel
- private RandomAccessFile mappedFile;
- private FileChannel channel;
- // Use MMap for data?
- private final boolean mMapData;
+ // File input view
+ private CachedRandomInputView inputView;
// Buffers
- private final SimpleReadBuffer sizeBuffer = new SimpleReadBuffer(new
byte[5]);
private final byte[] slotBuffer;
- HashLookupStoreReader(boolean useMmp, long mmpSegmentSize, File file)
throws IOException {
+ HashLookupStoreReader(CacheManager cacheManager, File file) throws
IOException {
// File path
if (!file.exists()) {
throw new FileNotFoundException("File " + file.getAbsolutePath() +
" not found");
}
LOG.info("Opening file {}", file.getName());
- this.mmpSegmentSize = mmpSegmentSize;
- // Check valid segmentSize
- if (this.mmpSegmentSize > Integer.MAX_VALUE) {
- throw new IllegalArgumentException("The mmpSegmentSize can't be
larger than 2GB");
- }
-
// Open file and read metadata
long createdAt;
FileInputStream inputStream = new FileInputStream(file);
DataInputStream dataInputStream = new DataInputStream(new
BufferedInputStream(inputStream));
// Offset of the index in the channel
- int indexOffset;
int keyCount;
+ int indexOffset;
+ long dataOffset;
try {
// Time
createdAt = dataInputStream.readLong();
@@ -136,9 +114,16 @@ public class HashLookupStoreReader
slotBuffer = new byte[maxSlotSize];
- // Read index and data offset
+ // Read index offset to resign indexOffsets
indexOffset = dataInputStream.readInt();
+ for (int i = 0; i < indexOffsets.length; i++) {
+ indexOffsets[i] = indexOffset + indexOffsets[i];
+ }
+ // Read data offset to resign dataOffsets
dataOffset = dataInputStream.readLong();
+ for (int i = 0; i < dataOffsets.length; i++) {
+ dataOffsets[i] = dataOffset + dataOffsets[i];
+ }
} finally {
// Close metadata
dataInputStream.close();
@@ -146,42 +131,7 @@ public class HashLookupStoreReader
}
// Create Mapped file in read-only mode
- mappedFile = new RandomAccessFile(file, "r");
- channel = mappedFile.getChannel();
- long fileSize = file.length();
-
- // Create index buffer
- indexBuffer =
- channel.map(FileChannel.MapMode.READ_ONLY, indexOffset,
dataOffset - indexOffset);
-
- // Create data buffers
- dataSize = fileSize - dataOffset;
-
- // Check if data size fits in memory map limit
- if (!useMmp) {
- // Use classical disk read
- mMapData = false;
- dataBuffers = null;
- } else {
- // Use Mmap
- mMapData = true;
-
- // Build data buffers
- int bufArraySize =
- (int) (dataSize / this.mmpSegmentSize)
- + ((dataSize % this.mmpSegmentSize != 0) ? 1 : 0);
- dataBuffers = new MappedByteBuffer[bufArraySize];
- int bufIdx = 0;
- for (long offset = 0; offset < dataSize; offset +=
this.mmpSegmentSize) {
- long remainingFileSize = dataSize - offset;
- long thisSegmentSize = Math.min(this.mmpSegmentSize,
remainingFileSize);
- dataBuffers[bufIdx++] =
- channel.map(
- FileChannel.MapMode.READ_ONLY,
- dataOffset + offset,
- thisSegmentSize);
- }
- }
+ inputView = new CachedRandomInputView(file, cacheManager);
// logging
DecimalFormat integerFormat = new DecimalFormat("#,##0.00");
@@ -201,13 +151,8 @@ public class HashLookupStoreReader
.append(integerFormat.format((dataOffset - indexOffset) /
(1024.0 * 1024.0)))
.append(" Mb\n");
statMsg.append(" Data size: ")
- .append(integerFormat.format((fileSize - dataOffset) / (1024.0
* 1024.0)))
+ .append(integerFormat.format((file.length() - dataOffset) /
(1024.0 * 1024.0)))
.append(" Mb\n");
- if (mMapData) {
- statMsg.append(" Number of memory mapped data buffers:
").append(dataBuffers.length);
- } else {
- statMsg.append(" Memory mapped data disabled, using disk");
- }
LOG.info(statMsg.toString());
}
@@ -217,25 +162,23 @@ public class HashLookupStoreReader
if (keyLength >= slots.length || keyCounts[keyLength] == 0) {
return null;
}
- long hash = MurmurHashUtils.hashBytesPositive(key);
+ int hash = MurmurHashUtils.hashBytesPositive(key);
int numSlots = slots[keyLength];
int slotSize = slotSizes[keyLength];
int indexOffset = indexOffsets[keyLength];
long dataOffset = dataOffsets[keyLength];
for (int probe = 0; probe < numSlots; probe++) {
- int slot = (int) ((hash + probe) % numSlots);
- indexBuffer.position(indexOffset + slot * slotSize);
- indexBuffer.get(slotBuffer, 0, slotSize);
+ long slot = (hash + probe) % numSlots;
+ inputView.setReadPosition(indexOffset + slot * slotSize);
+ inputView.readFully(slotBuffer, 0, slotSize);
long offset = VarLengthIntUtils.decodeLong(slotBuffer, keyLength);
if (offset == 0) {
return null;
}
if (isKey(slotBuffer, key)) {
- return mMapData
- ? getMMapBytes(dataOffset + offset)
- : getDiskBytes(dataOffset + offset);
+ return getValue(dataOffset + offset);
}
}
return null;
@@ -250,85 +193,18 @@ public class HashLookupStoreReader
return true;
}
- // Read the data at the given offset, the data can be spread over multiple
data buffers
- private byte[] getMMapBytes(long offset) {
- // Read the first 4 bytes to get the size of the data
- ByteBuffer buf = getDataBuffer(offset);
- int maxLen = (int) Math.min(5, dataSize - offset);
-
- int size;
- if (buf.remaining() >= maxLen) {
- // Continuous read
- int pos = buf.position();
- size = VarLengthIntUtils.decodeInt(buf);
-
- // Used in case of data is spread over multiple buffers
- offset += buf.position() - pos;
- } else {
- // The size of the data is spread over multiple buffers
- int len = maxLen;
- int off = 0;
- sizeBuffer.reset();
- while (len > 0) {
- buf = getDataBuffer(offset + off);
- int count = Math.min(len, buf.remaining());
- buf.get(sizeBuffer.getBuf(), off, count);
- off += count;
- len -= count;
- }
- size = VarLengthIntUtils.decodeInt(sizeBuffer);
- offset += sizeBuffer.getPos();
- buf = getDataBuffer(offset);
- }
-
- // Create output bytes
- byte[] res = new byte[size];
-
- // Check if the data is one buffer
- if (buf.remaining() >= size) {
- // Continuous read
- buf.get(res, 0, size);
- } else {
- int len = size;
- int off = 0;
- while (len > 0) {
- buf = getDataBuffer(offset);
- int count = Math.min(len, buf.remaining());
- buf.get(res, off, count);
- offset += count;
- off += count;
- len -= count;
- }
- }
-
- return res;
- }
-
- // Get data from disk
- private byte[] getDiskBytes(long offset) throws IOException {
- mappedFile.seek(dataOffset + offset);
+ private byte[] getValue(long offset) throws IOException {
+ inputView.setReadPosition(offset);
// Get size of data
- int size = VarLengthIntUtils.decodeInt(mappedFile);
+ int size = VarLengthIntUtils.decodeInt(inputView);
// Create output bytes
byte[] res = new byte[size];
-
- // Read data
- if (mappedFile.read(res) == -1) {
- throw new EOFException();
- }
-
+ inputView.readFully(res);
return res;
}
- // Return the data buffer for the given position
- private ByteBuffer getDataBuffer(long index) {
- ByteBuffer buf = dataBuffers[(int) (index / mmpSegmentSize)];
- buf.position((int) (index % mmpSegmentSize));
- return buf;
- }
-
private String formatCreatedAt(long createdAt) {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy.MM.dd G 'at'
HH:mm:ss z");
Calendar cl = Calendar.getInstance();
@@ -338,12 +214,8 @@ public class HashLookupStoreReader
@Override
public void close() throws IOException {
- channel.close();
- mappedFile.close();
- indexBuffer = null;
- dataBuffers = null;
- mappedFile = null;
- channel = null;
+ inputView.close();
+ inputView = null;
}
@Override
@@ -393,11 +265,11 @@ public class HashLookupStoreReader
@Override
public FastEntry next() {
try {
- indexBuffer.position(currentIndexOffset);
+ inputView.setReadPosition(currentIndexOffset);
long offset = 0;
while (offset == 0) {
- indexBuffer.get(currentSlotBuffer);
+ inputView.readFully(currentSlotBuffer);
offset = VarLengthIntUtils.decodeLong(currentSlotBuffer,
currentKeyLength);
currentIndexOffset += currentSlotBuffer.length;
}
@@ -407,7 +279,7 @@ public class HashLookupStoreReader
if (withValue) {
long valueOffset = currentDataOffset + offset;
- value = mMapData ? getMMapBytes(valueOffset) :
getDiskBytes(valueOffset);
+ value = getValue(valueOffset);
}
entry.set(key, value);
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/options/MemorySize.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/options/MemorySize.java
index 6595c729..b906d66c 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/options/MemorySize.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/options/MemorySize.java
@@ -84,6 +84,10 @@ public class MemorySize implements java.io.Serializable,
Comparable<MemorySize>
return new MemorySize(mebiBytes << 20);
}
+ public static MemorySize ofKibiBytes(long kibiBytes) {
+ return new MemorySize(kibiBytes << 10);
+ }
+
// ------------------------------------------------------------------------
/** Gets the memory size in bytes. */
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/IOFunction.java
similarity index 59%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
copy to
flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/IOFunction.java
index d2b62bd1..a6d761b3 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/IOFunction.java
@@ -16,19 +16,16 @@
* limitations under the License.
*/
-package org.apache.flink.table.store.file.mergetree.compact;
+package org.apache.flink.table.store.utils;
-import org.apache.flink.table.store.file.compact.CompactResult;
-import org.apache.flink.table.store.file.io.DataFileMeta;
-import org.apache.flink.table.store.file.mergetree.SortedRun;
+import java.io.IOException;
-import java.util.List;
-
-/** Rewrite sections to the files. */
-public interface CompactRewriter {
-
- CompactResult rewrite(int outputLevel, boolean dropDelete,
List<List<SortedRun>> sections)
- throws Exception;
+/**
+ * A functional interface for a {@link java.util.function.Function} that may
throw {@link
+ * IOException}.
+ */
+@FunctionalInterface
+public interface IOFunction<T, R> {
- CompactResult upgrade(int outputLevel, DataFileMeta file) throws Exception;
+ R apply(T value) throws IOException;
}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MathUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MathUtils.java
index b14cc55b..6eb7cc15 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MathUtils.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/MathUtils.java
@@ -21,6 +21,17 @@ package org.apache.flink.table.store.utils;
/** Collection of simple mathematical routines. */
public class MathUtils {
+ /**
+ * Decrements the given number down to the closest power of two. If the
argument is a power of
+ * two, it remains unchanged.
+ *
+ * @param value The value to round down.
+ * @return The closest value that is a power of two and less or equal than
the given value.
+ */
+ public static int roundDownToPowerOf2(int value) {
+ return Integer.highestOneBit(value);
+ }
+
/**
* Computes the logarithm of the given value to the base of 2. This method
throws an error, if
* the given argument is not a power of 2.
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
deleted file mode 100644
index 9960495e..00000000
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/SimpleReadBuffer.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Copyright 2015 LinkedIn Corp. All rights reserved.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- */
-
-package org.apache.flink.table.store.utils;
-
-/** A simple read buffer provides {@code readUnsignedByte} and position. */
-public final class SimpleReadBuffer {
-
- private int pos = 0;
- private final byte[] buf;
-
- public SimpleReadBuffer(byte[] data) {
- buf = data;
- }
-
- public byte[] getBuf() {
- return buf;
- }
-
- public int getPos() {
- return pos;
- }
-
- public SimpleReadBuffer reset() {
- pos = 0;
- return this;
- }
-
- public int readUnsignedByte() {
- return buf[pos++] & 0xff;
- }
-}
diff --git
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
index a6180a80..481a1597 100644
---
a/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
+++
b/flink-table-store-common/src/main/java/org/apache/flink/table/store/utils/VarLengthIntUtils.java
@@ -17,7 +17,6 @@ package org.apache.flink.table.store.utils;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
-import java.nio.ByteBuffer;
/** Utils for encoding int/long to var length bytes. */
public final class VarLengthIntUtils {
@@ -99,17 +98,6 @@ public final class VarLengthIntUtils {
return i;
}
- public static int decodeInt(SimpleReadBuffer is) {
- for (int offset = 0, result = 0; offset < 32; offset += 7) {
- int b = is.readUnsignedByte();
- result |= (b & 0x7F) << offset;
- if ((b & 0x80) == 0) {
- return result;
- }
- }
- throw new Error("Malformed integer.");
- }
-
public static int decodeInt(DataInput is) throws IOException {
for (int offset = 0, result = 0; offset < 32; offset += 7) {
int b = is.readUnsignedByte();
@@ -120,15 +108,4 @@ public final class VarLengthIntUtils {
}
throw new Error("Malformed integer.");
}
-
- public static int decodeInt(ByteBuffer bb) {
- for (int offset = 0, result = 0; offset < 32; offset += 7) {
- int b = bb.get() & 0xffff;
- result |= (b & 0x7F) << offset;
- if ((b & 0x80) == 0) {
- return result;
- }
- }
- throw new Error("Malformed integer.");
- }
}
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/io/cache/CachedRandomInputViewTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/io/cache/CachedRandomInputViewTest.java
new file mode 100644
index 00000000..e9e2f090
--- /dev/null
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/io/cache/CachedRandomInputViewTest.java
@@ -0,0 +1,106 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.io.cache;
+
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.options.MemorySize;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CachedRandomInputView}. */
+public class CachedRandomInputViewTest {
+
+ @TempDir Path tempDir;
+
+ private final ThreadLocalRandom rnd = ThreadLocalRandom.current();
+
+ @Test
+ public void testMatched() throws IOException {
+ innerTest(1024 * 512);
+ }
+
+ @Test
+ public void testNotMatched() throws IOException {
+ innerTest(131092);
+ }
+
+ @Test
+ public void testRandom() throws IOException {
+ innerTest(rnd.nextInt(5000, 100000));
+ }
+
+ private void innerTest(int len) throws IOException {
+ byte[] bytes = new byte[len];
+ MemorySegment segment = MemorySegment.wrap(bytes);
+ for (int i = 0; i < bytes.length; i++) {
+ bytes[i] = (byte) rnd.nextInt();
+ }
+
+ File file = writeFile(bytes);
+ CacheManager cacheManager = new CacheManager(1024,
MemorySize.ofKibiBytes(128));
+ CachedRandomInputView view = new CachedRandomInputView(file,
cacheManager);
+
+ // read first one
+ view.setReadPosition(0);
+ assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(0));
+
+ // read mid
+ int mid = bytes.length / 2;
+ view.setReadPosition(mid);
+ assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(mid));
+
+ // read special
+ view.setReadPosition(1021);
+ assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(1021));
+
+ // read last one
+ view.setReadPosition(bytes.length - 1);
+ assertThat(view.readByte()).isEqualTo(bytes[bytes.length - 1]);
+
+ // random read
+ for (int i = 0; i < 10000; i++) {
+ int position = rnd.nextInt(bytes.length - 8);
+ view.setReadPosition(position);
+
assertThat(view.readLong()).isEqualTo(segment.getLongBigEndian(position));
+ }
+
+ view.close();
+ assertThat(cacheManager.cache().size()).isEqualTo(0);
+ }
+
+ private File writeFile(byte[] bytes) throws IOException {
+ File file = new File(tempDir.toFile(), UUID.randomUUID().toString());
+ if (!file.createNewFile()) {
+ throw new IOException("Can not create: " + file);
+ }
+ Files.write(file.toPath(), bytes, StandardOpenOption.WRITE);
+ return file;
+ }
+}
diff --git
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
index 6c76f425..fe058a94 100644
---
a/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
+++
b/flink-table-store-common/src/test/java/org/apache/flink/table/store/lookup/hash/HashLookupStoreFactoryTest.java
@@ -23,7 +23,9 @@
package org.apache.flink.table.store.lookup.hash;
import org.apache.flink.table.store.io.DataOutputSerializer;
+import org.apache.flink.table.store.io.cache.CacheManager;
import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.utils.MathUtils;
import org.apache.flink.table.store.utils.VarLengthIntUtils;
import org.apache.commons.math3.random.RandomDataGenerator;
@@ -42,6 +44,7 @@ import java.util.Map;
import java.util.Random;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ThreadLocalRandom;
import static org.assertj.core.api.Assertions.assertThat;
@@ -57,7 +60,9 @@ public class HashLookupStoreFactoryTest {
@BeforeEach
public void setUp() throws IOException {
- this.factory = new HashLookupStoreFactory(0.75d, true,
MemorySize.ofMebiBytes(1024));
+ this.factory =
+ new HashLookupStoreFactory(
+ new CacheManager(1024, MemorySize.ofMebiBytes(1)),
0.75d);
this.file = new File(tempDir.toFile(), UUID.randomUUID().toString());
if (!file.createNewFile()) {
throw new IOException("Can not create file: " + file);
@@ -171,7 +176,12 @@ public class HashLookupStoreFactoryTest {
writeStore(file, keys, values);
// Read
- factory = new HashLookupStoreFactory(0.75d, true, new
MemorySize(byteSize - 100));
+ factory =
+ new HashLookupStoreFactory(
+ new CacheManager(
+ MathUtils.roundDownToPowerOf2(byteSize - 100),
+ MemorySize.ofMebiBytes(1)),
+ 0.75d);
HashLookupStoreReader reader = factory.createReader(file);
for (int i = 0; i < keys.length; i++) {
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
@@ -197,7 +207,12 @@ public class HashLookupStoreFactoryTest {
writeStore(file, keys, values);
// Read
- factory = new HashLookupStoreFactory(0.75d, true, new
MemorySize(byteSize + sizeSize + 3));
+ factory =
+ new HashLookupStoreFactory(
+ new CacheManager(
+ MathUtils.roundDownToPowerOf2(byteSize +
sizeSize + 3),
+ MemorySize.ofMebiBytes(1)),
+ 0.75d);
HashLookupStoreReader reader = factory.createReader(file);
for (int i = 0; i < keys.length; i++) {
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
@@ -250,21 +265,25 @@ public class HashLookupStoreFactoryTest {
}
@Test
- public void testReadDisk() throws IOException {
- Integer[] keys = generateIntKeys(10000);
+ public void testCacheExpiration() throws IOException {
+ int len = 1000;
+ ThreadLocalRandom rnd = ThreadLocalRandom.current();
+ Object[] keys = new Object[len];
+ Object[] values = new Object[len];
+ for (int i = 0; i < len; i++) {
+ keys[i] = rnd.nextInt();
+ values[i] = generateStringData(100);
+ }
// Write
- Object[] values = generateStringData(keys.length, 1000);
writeStore(file, keys, values);
// Read
- factory = new HashLookupStoreFactory(0.75d, false,
MemorySize.ofMebiBytes(1024));
+ factory = new HashLookupStoreFactory(new CacheManager(1024, new
MemorySize(8096)), 0.75d);
HashLookupStoreReader reader = factory.createReader(file);
-
for (int i = 0; i < keys.length; i++) {
assertThat(reader.lookup(toBytes(keys[i]))).isEqualTo(toBytes(values[i]));
}
- reader.close();
}
@Test
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
index 89395df8..6cb9ebe1 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/CoreOptions.java
@@ -496,6 +496,34 @@ public class CoreOptions implements Serializable {
.withDescription(
"Define partition by table options, cannot define
partition on DDL and table options at the same time.");
+ public static final ConfigOption<Float> LOOKUP_HASH_LOAD_FACTOR =
+ key("lookup.hash-load-factor")
+ .floatType()
+ .defaultValue(0.75F)
+ .withDescription("The index load factor for lookup.");
+
+ public static final ConfigOption<Duration> LOOKUP_CACHE_FILE_RETENTION =
+ key("lookup.cache-file-retention")
+ .durationType()
+ .defaultValue(Duration.ofHours(1))
+ .withDescription(
+ "The cached files retention time for lookup. After
the file expires,"
+ + " if there is a need for access, it will
be re-read from the DFS to build"
+ + " an index on the local disk.");
+
+ public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_DISK_SIZE =
+ key("lookup.cache-max-disk-size")
+ .memoryType()
+ .defaultValue(MemorySize.MAX_VALUE)
+ .withDescription(
+ "Max disk size for lookup cache, you can use this
option to limit the use of local disks.");
+
+ public static final ConfigOption<MemorySize> LOOKUP_CACHE_MAX_MEMORY_SIZE =
+ key("lookup.cache-max-memory-size")
+ .memoryType()
+ .defaultValue(MemorySize.parse("256 mb"))
+ .withDescription("Max memory size for lookup cache.");
+
private final Options options;
public CoreOptions(Map<String, String> options) {
@@ -885,7 +913,11 @@ public class CoreOptions implements Serializable {
"input",
"Double write to a changelog file when flushing memory table,
the changelog is from input."),
- FULL_COMPACTION("full-compaction", "Generate changelog files with each
full compaction.");
+ FULL_COMPACTION("full-compaction", "Generate changelog files with each
full compaction."),
+
+ LOOKUP(
+ "lookup",
+ "Generate changelog files through 'lookup' before committing
the data writing.");
private final String value;
private final String description;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
index 3777e27e..132995e0 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/append/AppendOnlyCompactManager.java
@@ -27,6 +27,7 @@ import
org.apache.flink.table.store.file.io.DataFilePathFactory;
import org.apache.flink.table.store.fs.FileIO;
import org.apache.flink.table.store.utils.Preconditions;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
@@ -177,6 +178,9 @@ public class AppendOnlyCompactManager extends
CompactFutureManager {
return toCompact;
}
+ @Override
+ public void close() throws IOException {}
+
/**
* A {@link CompactTask} impl for full compaction of append-only table.
*
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
index e82e1748..982405d4 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/CompactManager.java
@@ -20,11 +20,12 @@ package org.apache.flink.table.store.file.compact;
import org.apache.flink.table.store.file.io.DataFileMeta;
+import java.io.Closeable;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
/** Manager to submit compaction task. */
-public interface CompactManager {
+public interface CompactManager extends Closeable {
/** Should wait compaction finish. */
boolean shouldWaitCompaction();
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
index 195b59de..3aff478d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/compact/NoopCompactManager.java
@@ -22,6 +22,7 @@ import org.apache.flink.table.store.CoreOptions;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.utils.Preconditions;
+import java.io.IOException;
import java.util.Optional;
import java.util.concurrent.ExecutionException;
@@ -56,4 +57,7 @@ public class NoopCompactManager implements CompactManager {
@Override
public void cancelCompaction() {}
+
+ @Override
+ public void close() throws IOException {}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/DataFileReader.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/DataFileReader.java
new file mode 100644
index 00000000..c7b53d26
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/DataFileReader.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.lookup.LookupStoreReader;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.utils.FileIOUtils;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/** Reader for a {@link DataFileMeta}. */
+public class DataFileReader implements Closeable {
+
+ private final File localFile;
+ private final DataFileMeta remoteFile;
+ private final LookupStoreReader reader;
+
+ public DataFileReader(File localFile, DataFileMeta remoteFile,
LookupStoreReader reader) {
+ this.localFile = localFile;
+ this.remoteFile = remoteFile;
+ this.reader = reader;
+ }
+
+ @Nullable
+ public byte[] get(byte[] key) throws IOException {
+ return reader.lookup(key);
+ }
+
+ public int fileKibiBytes() {
+ long kibiBytes = localFile.length() >> 10;
+ if (kibiBytes > Integer.MAX_VALUE) {
+ throw new RuntimeException(
+ "Lookup file is too big: " +
MemorySize.ofKibiBytes(kibiBytes));
+ }
+ return (int) kibiBytes;
+ }
+
+ public DataFileMeta remoteFile() {
+ return remoteFile;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ FileIOUtils.deleteFileOrDirectory(localFile);
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
index f295f456..3804e29b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/Levels.java
@@ -27,6 +27,7 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
@@ -42,6 +43,8 @@ public class Levels {
private final List<SortedRun> levels;
+ private final List<DropFileCallback> dropFileCallbacks = new ArrayList<>();
+
public Levels(
Comparator<InternalRow> keyComparator, List<DataFileMeta>
inputFiles, int numLevels) {
this.keyComparator = keyComparator;
@@ -83,6 +86,10 @@ public class Levels {
"Number of files stored in Levels does not equal to the size
of inputFiles. This is unexpected.");
}
+ public void addDropFileCallback(DropFileCallback callback) {
+ dropFileCallbacks.add(callback);
+ }
+
public void addLevel0File(DataFileMeta file) {
checkArgument(file.level() == 0);
level0.add(file);
@@ -148,6 +155,16 @@ public class Levels {
groupedBefore.getOrDefault(i, emptyList()),
groupedAfter.getOrDefault(i, emptyList()));
}
+
+ if (dropFileCallbacks.size() > 0) {
+ Set<String> droppedFiles =
+
before.stream().map(DataFileMeta::fileName).collect(Collectors.toSet());
+ // exclude upgrade files
+
after.stream().map(DataFileMeta::fileName).forEach(droppedFiles::remove);
+ for (DropFileCallback callback : dropFileCallbacks) {
+ droppedFiles.forEach(callback::notifyDropFile);
+ }
+ }
}
private void updateLevel(int level, List<DataFileMeta> before,
List<DataFileMeta> after) {
@@ -170,4 +187,10 @@ public class Levels {
return files.stream()
.collect(Collectors.groupingBy(DataFileMeta::level,
Collectors.toList()));
}
+
+ /** A callback to notify dropping file. */
+ public interface DropFileCallback {
+
+ void notifyDropFile(String file);
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
new file mode 100644
index 00000000..cc50f718
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/LookupLevels.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.annotation.VisibleForTesting;
+import org.apache.flink.table.store.data.BinaryRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.io.DataOutputSerializer;
+import org.apache.flink.table.store.lookup.LookupStoreFactory;
+import org.apache.flink.table.store.lookup.LookupStoreReader;
+import org.apache.flink.table.store.lookup.LookupStoreWriter;
+import org.apache.flink.table.store.memory.MemorySegment;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.reader.RecordReader;
+import org.apache.flink.table.store.types.RowKind;
+import org.apache.flink.table.store.utils.FileIOUtils;
+import org.apache.flink.table.store.utils.IOFunction;
+
+import org.apache.flink.shaded.guava30.com.google.common.cache.Cache;
+import org.apache.flink.shaded.guava30.com.google.common.cache.CacheBuilder;
+import
org.apache.flink.shaded.guava30.com.google.common.cache.RemovalNotification;
+
+import javax.annotation.Nullable;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.UncheckedIOException;
+import java.time.Duration;
+import java.util.Comparator;
+import java.util.List;
+import java.util.concurrent.ExecutionException;
+import java.util.function.Supplier;
+
+import static org.apache.flink.table.store.options.ConfigOptions.key;
+
+/** Provide lookup by key. */
+public class LookupLevels implements Levels.DropFileCallback, Closeable {
+
+ private final Levels levels;
+ private final Comparator<InternalRow> keyComparator;
+ private final InternalRowSerializer keySerializer;
+ private final InternalRowSerializer valueSerializer;
+ private final IOFunction<DataFileMeta, RecordReader<KeyValue>>
fileReaderFactory;
+ private final Supplier<File> localFileFactory;
+ private final LookupStoreFactory lookupStoreFactory;
+
+ private final Cache<String, LookupFile> lookupFiles;
+
+ public LookupLevels(
+ Levels levels,
+ Comparator<InternalRow> keyComparator,
+ InternalRowSerializer keySerializer,
+ InternalRowSerializer valueSerializer,
+ IOFunction<DataFileMeta, RecordReader<KeyValue>> fileReaderFactory,
+ Supplier<File> localFileFactory,
+ LookupStoreFactory lookupStoreFactory,
+ Duration fileRetention,
+ MemorySize maxDiskSize) {
+ this.levels = levels;
+ this.keyComparator = keyComparator;
+ this.keySerializer = keySerializer;
+ this.valueSerializer = valueSerializer;
+ this.fileReaderFactory = fileReaderFactory;
+ this.localFileFactory = localFileFactory;
+ this.lookupStoreFactory = lookupStoreFactory;
+ this.lookupFiles =
+ CacheBuilder.newBuilder()
+ .expireAfterAccess(fileRetention)
+ .maximumWeight(maxDiskSize.getKibiBytes())
+ .weigher(this::fileWeigh)
+ .removalListener(this::removalCallback)
+ .build();
+ levels.addDropFileCallback(this);
+ }
+
+ @VisibleForTesting
+ Cache<String, LookupFile> lookupFiles() {
+ return lookupFiles;
+ }
+
+ @Override
+ public void notifyDropFile(String file) {
+ lookupFiles.invalidate(file);
+ }
+
+ @Nullable
+ public KeyValue lookup(InternalRow key, int startLevel) throws IOException
{
+ if (startLevel == 0) {
+ throw new IllegalArgumentException("Start level can not be zero.");
+ }
+
+ KeyValue kv = null;
+ for (int i = startLevel; i < levels.numberOfLevels(); i++) {
+ SortedRun level = levels.runOfLevel(i);
+ kv = lookup(key, level);
+ if (kv != null) {
+ break;
+ }
+ }
+
+ return kv;
+ }
+
+ @Nullable
+ private KeyValue lookup(InternalRow target, SortedRun level) throws
IOException {
+ List<DataFileMeta> files = level.files();
+ int left = 0;
+ int right = files.size() - 1;
+
+ // binary search restart positions to find the restart position
immediately before the
+ // targetKey
+ while (left < right) {
+ int mid = (left + right) / 2;
+
+ if (keyComparator.compare(files.get(mid).maxKey(), target) < 0) {
+ // Key at "mid.max" is < "target". Therefore all
+ // files at or before "mid" are uninteresting.
+ left = mid + 1;
+ } else {
+ // Key at "mid.max" is >= "target". Therefore all files
+ // after "mid" are uninteresting.
+ right = mid;
+ }
+ }
+
+ int index = right;
+
+ // if the index is now pointing to the last file, check if the largest
key in the block is
+ // than the target key. If so, we need to seek beyond the end of this
file
+ if (index == files.size() - 1
+ && keyComparator.compare(files.get(index).maxKey(), target) <
0) {
+ index++;
+ }
+
+ // if files does not have a next, it means the key does not exist in
this level
+ return index < files.size() ? lookup(target, files.get(index)) : null;
+ }
+
+ @Nullable
+ private KeyValue lookup(InternalRow key, DataFileMeta file) throws
IOException {
+ LookupFile lookupFile;
+ try {
+ lookupFile = lookupFiles.get(file.fileName(), () ->
createLookupFile(file));
+ } catch (ExecutionException e) {
+ throw new IOException(e);
+ }
+ byte[] keyBytes = keySerializer.toBinaryRow(key).toBytes();
+ byte[] valueBytes = lookupFile.get(keyBytes);
+ if (valueBytes == null) {
+ return null;
+ }
+ MemorySegment memorySegment = MemorySegment.wrap(valueBytes);
+ long sequenceNumber = memorySegment.getLong(0);
+ RowKind rowKind = RowKind.fromByteValue(valueBytes[8]);
+ BinaryRow value = new BinaryRow(valueSerializer.getArity());
+ value.pointTo(memorySegment, 9, valueBytes.length - 9);
+ return new KeyValue()
+ .replace(key, sequenceNumber, rowKind, value)
+ .setLevel(lookupFile.remoteFile().level());
+ }
+
+ private int fileWeigh(String file, LookupFile lookupFile) {
+ return lookupFile.fileKibiBytes();
+ }
+
+ private void removalCallback(RemovalNotification<String, LookupFile>
notification) {
+ LookupFile reader = notification.getValue();
+ if (reader != null) {
+ try {
+ reader.close();
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ }
+ }
+
+ private LookupFile createLookupFile(DataFileMeta file) throws IOException {
+ File localFile = localFileFactory.get();
+ if (!localFile.createNewFile()) {
+ throw new IOException("Can not create new file: " + localFile);
+ }
+ try (LookupStoreWriter kvWriter =
lookupStoreFactory.createWriter(localFile);
+ RecordReader<KeyValue> reader = fileReaderFactory.apply(file))
{
+ DataOutputSerializer valueOut = new DataOutputSerializer(32);
+ RecordReader.RecordIterator<KeyValue> batch;
+ KeyValue kv;
+ while ((batch = reader.readBatch()) != null) {
+ while ((kv = batch.next()) != null) {
+ byte[] keyBytes =
keySerializer.toBinaryRow(kv.key()).toBytes();
+ valueOut.clear();
+ valueOut.writeLong(kv.sequenceNumber());
+ valueOut.writeByte(kv.valueKind().toByteValue());
+
valueOut.write(valueSerializer.toBinaryRow(kv.value()).toBytes());
+ byte[] valueBytes = valueOut.getCopyOfBuffer();
+ kvWriter.put(keyBytes, valueBytes);
+ }
+ batch.releaseBatch();
+ }
+ } catch (IOException e) {
+ FileIOUtils.deleteFileOrDirectory(localFile);
+ throw e;
+ }
+
+ return new LookupFile(localFile, file,
lookupStoreFactory.createReader(localFile));
+ }
+
+ @Override
+ public void close() throws IOException {
+ lookupFiles.invalidateAll();
+ }
+
+ private static class LookupFile implements Closeable {
+
+ private final File localFile;
+ private final DataFileMeta remoteFile;
+ private final LookupStoreReader reader;
+
+ public LookupFile(File localFile, DataFileMeta remoteFile,
LookupStoreReader reader) {
+ this.localFile = localFile;
+ this.remoteFile = remoteFile;
+ this.reader = reader;
+ }
+
+ @Nullable
+ public byte[] get(byte[] key) throws IOException {
+ return reader.lookup(key);
+ }
+
+ public int fileKibiBytes() {
+ long kibiBytes = localFile.length() >> 10;
+ if (kibiBytes > Integer.MAX_VALUE) {
+ throw new RuntimeException(
+ "Lookup file is too big: " +
MemorySize.ofKibiBytes(kibiBytes));
+ }
+ return (int) kibiBytes;
+ }
+
+ public DataFileMeta remoteFile() {
+ return remoteFile;
+ }
+
+ @Override
+ public void close() throws IOException {
+ reader.close();
+ FileIOUtils.deleteFileOrDirectory(localFile);
+ }
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
index b159b23e..28a1f742 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/MergeTreeWriter.java
@@ -277,6 +277,7 @@ public class MergeTreeWriter implements
RecordWriter<KeyValue>, MemoryOwner {
// cancel compaction so that it does not block job cancelling
compactManager.cancelCompaction();
sync();
+ compactManager.close();
// delete temporary files
List<DataFileMeta> delete = new ArrayList<>(newFiles);
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AbstractCompactRewriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AbstractCompactRewriter.java
index 9881b0a5..8bff9e0f 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AbstractCompactRewriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/AbstractCompactRewriter.java
@@ -22,6 +22,7 @@ import
org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.SortedRun;
+import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;
@@ -41,4 +42,7 @@ public abstract class AbstractCompactRewriter implements
CompactRewriter {
.flatMap(Collection::stream)
.collect(Collectors.toList());
}
+
+ @Override
+ public void close() throws IOException {}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
index d2b62bd1..dd6cc8dd 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/CompactRewriter.java
@@ -22,10 +22,11 @@ import
org.apache.flink.table.store.file.compact.CompactResult;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.mergetree.SortedRun;
+import java.io.Closeable;
import java.util.List;
/** Rewrite sections to the files. */
-public interface CompactRewriter {
+public interface CompactRewriter extends Closeable {
CompactResult rewrite(int outputLevel, boolean dropDelete,
List<List<SortedRun>> sections)
throws Exception;
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
index 6fc064c4..5bbe3571 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
@@ -26,6 +26,7 @@ import
org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
import org.apache.flink.table.store.file.mergetree.SortedRun;
import org.apache.flink.table.store.utils.Preconditions;
+import java.io.IOException;
import java.util.Comparator;
import java.util.List;
@@ -65,4 +66,7 @@ public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRew
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int
outputLevel) {
return new FullChangelogMergeFunctionWrapper(mfFactory.create(),
maxLevel);
}
+
+ @Override
+ public void close() throws IOException {}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupCompaction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupCompaction.java
new file mode 100644
index 00000000..acab8c3d
--- /dev/null
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupCompaction.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree.compact;
+
+import org.apache.flink.table.store.file.compact.CompactUnit;
+import org.apache.flink.table.store.file.mergetree.LevelSortedRun;
+
+import java.util.List;
+import java.util.Optional;
+
+/** A {@link CompactStrategy} to force compacting level 0 files. */
+public class LookupCompaction implements CompactStrategy {
+
+ private final UniversalCompaction universalCompaction;
+
+ public LookupCompaction(UniversalCompaction universalCompaction) {
+ this.universalCompaction = universalCompaction;
+ }
+
+ @Override
+ public Optional<CompactUnit> pick(int numLevels, List<LevelSortedRun>
runs) {
+ Optional<CompactUnit> pick = universalCompaction.pick(numLevels, runs);
+ if (pick.isPresent()) {
+ return pick;
+ }
+
+ if (runs.isEmpty() || runs.get(0).level() > 0) {
+ return Optional.empty();
+ }
+
+ // collect all level 0 files
+ int candidateCount = 1;
+ for (int i = candidateCount; i < runs.size(); i++) {
+ if (runs.get(i).level() > 0) {
+ break;
+ }
+ candidateCount++;
+ }
+
+ return Optional.of(
+ universalCompaction.pickForSizeRatio(numLevels - 1, runs,
candidateCount, true));
+ }
+}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeTreeCompactRewriter.java
similarity index 58%
copy from
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
copy to
flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeTreeCompactRewriter.java
index 6fc064c4..ce3fdbcf 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/FullChangelogMergeTreeCompactRewriter.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/LookupMergeTreeCompactRewriter.java
@@ -23,46 +23,71 @@ import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
+import org.apache.flink.table.store.file.mergetree.LookupLevels;
import org.apache.flink.table.store.file.mergetree.SortedRun;
-import org.apache.flink.table.store.utils.Preconditions;
+import java.io.IOException;
+import java.io.UncheckedIOException;
import java.util.Comparator;
import java.util.List;
-/** A {@link MergeTreeCompactRewriter} which produces changelog files for each
full compaction. */
-public class FullChangelogMergeTreeCompactRewriter extends
ChangelogMergeTreeRewriter {
+/**
+ * A {@link MergeTreeCompactRewriter} which produces changelog files by lookup
for the compaction
+ * involving level 0 files.
+ */
+public class LookupMergeTreeCompactRewriter extends ChangelogMergeTreeRewriter
{
- private final int maxLevel;
+ private final LookupLevels lookupLevels;
- public FullChangelogMergeTreeCompactRewriter(
- int maxLevel,
+ public LookupMergeTreeCompactRewriter(
+ LookupLevels lookupLevels,
KeyValueFileReaderFactory readerFactory,
KeyValueFileWriterFactory writerFactory,
Comparator<InternalRow> keyComparator,
MergeFunctionFactory<KeyValue> mfFactory) {
super(readerFactory, writerFactory, keyComparator, mfFactory);
- this.maxLevel = maxLevel;
+ this.lookupLevels = lookupLevels;
}
@Override
protected boolean rewriteChangelog(
int outputLevel, boolean dropDelete, List<List<SortedRun>>
sections) {
- boolean changelog = outputLevel == maxLevel;
- if (changelog) {
- Preconditions.checkArgument(
- dropDelete,
- "Delete records should be dropped from result of full
compaction. This is unexpected.");
+ if (outputLevel == 0) {
+ return false;
+ }
+
+ for (List<SortedRun> runs : sections) {
+ for (SortedRun run : runs) {
+ for (DataFileMeta file : run.files()) {
+ if (file.level() == 0) {
+ return true;
+ }
+ }
+ }
}
- return changelog;
+ return false;
}
@Override
protected boolean upgradeChangelog(int outputLevel, DataFileMeta file) {
- return outputLevel == maxLevel;
+ return file.level() == 0;
}
@Override
protected MergeFunctionWrapper<ChangelogResult> createMergeWrapper(int
outputLevel) {
- return new FullChangelogMergeFunctionWrapper(mfFactory.create(),
maxLevel);
+ return new LookupChangelogMergeFunctionWrapper(
+ mfFactory,
+ key -> {
+ try {
+ return lookupLevels.lookup(key, outputLevel + 1);
+ } catch (IOException e) {
+ throw new UncheckedIOException(e);
+ }
+ });
+ }
+
+ @Override
+ public void close() throws IOException {
+ lookupLevels.close();
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
index 99a09be0..58c02c4c 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/MergeTreeCompactManager.java
@@ -31,6 +31,7 @@ import org.apache.flink.table.store.utils.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.IOException;
import java.util.Comparator;
import java.util.List;
import java.util.Optional;
@@ -185,4 +186,9 @@ public class MergeTreeCompactManager extends
CompactFutureManager {
});
return result;
}
+
+ @Override
+ public void close() throws IOException {
+ rewriter.close();
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
index 933c7a4b..91e380ee 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/mergetree/compact/UniversalCompaction.java
@@ -162,6 +162,7 @@ public class UniversalCompaction implements CompactStrategy
{
if (runCount == runs.size()) {
outputLevel = maxLevel;
} else {
+ // level of next run - 1
outputLevel = Math.max(0, runs.get(runCount).level() - 1);
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
index 12a160b1..23cbbe6d 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreWrite.java
@@ -19,8 +19,10 @@
package org.apache.flink.table.store.file.operation;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.data.BinaryRow;
import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.compact.CompactManager;
import org.apache.flink.table.store.file.compact.NoopCompactManager;
@@ -28,10 +30,13 @@ import org.apache.flink.table.store.file.io.DataFileMeta;
import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
import org.apache.flink.table.store.file.mergetree.Levels;
+import org.apache.flink.table.store.file.mergetree.LookupLevels;
import org.apache.flink.table.store.file.mergetree.MergeTreeWriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactRewriter;
import org.apache.flink.table.store.file.mergetree.compact.CompactStrategy;
import
org.apache.flink.table.store.file.mergetree.compact.FullChangelogMergeTreeCompactRewriter;
+import org.apache.flink.table.store.file.mergetree.compact.LookupCompaction;
+import
org.apache.flink.table.store.file.mergetree.compact.LookupMergeTreeCompactRewriter;
import
org.apache.flink.table.store.file.mergetree.compact.MergeFunctionFactory;
import
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactManager;
import
org.apache.flink.table.store.file.mergetree.compact.MergeTreeCompactRewriter;
@@ -43,6 +48,7 @@ import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.format.FileFormatDiscover;
import org.apache.flink.table.store.fs.FileIO;
+import org.apache.flink.table.store.lookup.hash.HashLookupStoreFactory;
import org.apache.flink.table.store.types.RowType;
import org.slf4j.Logger;
@@ -67,6 +73,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
private final MergeFunctionFactory<KeyValue> mfFactory;
private final CoreOptions options;
private final FileIO fileIO;
+ private final RowType keyType;
+ private final RowType valueType;
public KeyValueFileStoreWrite(
FileIO fileIO,
@@ -84,6 +92,8 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
KeyValueFieldsExtractor extractor) {
super(commitUser, snapshotManager, scan, options);
this.fileIO = fileIO;
+ this.keyType = keyType;
+ this.valueType = valueType;
this.readerFactoryBuilder =
KeyValueFileReaderFactory.builder(
fileIO,
@@ -147,17 +157,18 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
writerFactoryBuilder.build(partition, bucket,
options.fileCompressionPerLevel());
Comparator<InternalRow> keyComparator = keyComparatorSupplier.get();
Levels levels = new Levels(keyComparator, restoreFiles,
options.numLevels());
+ UniversalCompaction universalCompaction =
+ new UniversalCompaction(
+ options.maxSizeAmplificationPercent(),
+ options.sortedRunSizeRatio(),
+ options.numSortedRunCompactionTrigger(),
+ options.maxSortedRunNum());
+ CompactStrategy compactStrategy =
+ options.changelogProducer() == ChangelogProducer.LOOKUP
+ ? new LookupCompaction(universalCompaction)
+ : universalCompaction;
CompactManager compactManager =
- createCompactManager(
- partition,
- bucket,
- new UniversalCompaction(
- options.maxSizeAmplificationPercent(),
- options.sortedRunSizeRatio(),
- options.numSortedRunCompactionTrigger(),
- options.maxSortedRunNum()),
- compactExecutor,
- levels);
+ createCompactManager(partition, bucket, compactStrategy,
compactExecutor, levels);
return new MergeTreeWriter(
bufferSpillable(),
options.localSortMaxNumFileHandles(),
@@ -185,7 +196,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
return new NoopCompactManager();
} else {
Comparator<InternalRow> keyComparator =
keyComparatorSupplier.get();
- CompactRewriter rewriter = createRewriter(partition, bucket,
keyComparator);
+ CompactRewriter rewriter = createRewriter(partition, bucket,
keyComparator, levels);
return new MergeTreeCompactManager(
compactExecutor,
levels,
@@ -198,21 +209,47 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
}
private MergeTreeCompactRewriter createRewriter(
- BinaryRow partition, int bucket, Comparator<InternalRow>
keyComparator) {
+ BinaryRow partition, int bucket, Comparator<InternalRow>
keyComparator, Levels levels) {
KeyValueFileReaderFactory readerFactory =
readerFactoryBuilder.build(partition, bucket);
KeyValueFileWriterFactory writerFactory =
writerFactoryBuilder.build(partition, bucket,
options.fileCompressionPerLevel());
+ switch (options.changelogProducer()) {
+ case FULL_COMPACTION:
+ return new FullChangelogMergeTreeCompactRewriter(
+ options.numLevels() - 1,
+ readerFactory,
+ writerFactory,
+ keyComparator,
+ mfFactory);
+ case LOOKUP:
+ LookupLevels lookupLevels = createLookupLevels(levels,
readerFactory);
+ return new LookupMergeTreeCompactRewriter(
+ lookupLevels, readerFactory, writerFactory,
keyComparator, mfFactory);
+ default:
+ return new MergeTreeCompactRewriter(
+ readerFactory, writerFactory, keyComparator,
mfFactory);
+ }
+ }
- if (options.changelogProducer() ==
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
- return new FullChangelogMergeTreeCompactRewriter(
- options.numLevels() - 1,
- readerFactory,
- writerFactory,
- keyComparator,
- mfFactory);
- } else {
- return new MergeTreeCompactRewriter(
- readerFactory, writerFactory, keyComparator, mfFactory);
+ private LookupLevels createLookupLevels(
+ Levels levels, KeyValueFileReaderFactory readerFactory) {
+ if (ioManager == null) {
+ throw new RuntimeException(
+ "Can not use lookup, there is no temp disk directory to
use.");
}
+ return new LookupLevels(
+ levels,
+ keyComparatorSupplier.get(),
+ new InternalRowSerializer(keyType),
+ new InternalRowSerializer(valueType),
+ file ->
+ readerFactory.createRecordReader(
+ file.schemaId(), file.fileName(),
file.level()),
+ () -> ioManager.createChannel().getPathFile(),
+ new HashLookupStoreFactory(
+ cacheManager,
+
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR)),
+
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
+
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE));
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
index 65788b4f..1032032c 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/MemoryFileStoreWrite.java
@@ -24,12 +24,15 @@ import org.apache.flink.table.store.file.memory.MemoryOwner;
import org.apache.flink.table.store.file.memory.MemoryPoolFactory;
import org.apache.flink.table.store.file.utils.RecordWriter;
import org.apache.flink.table.store.file.utils.SnapshotManager;
+import org.apache.flink.table.store.io.cache.CacheManager;
import org.apache.flink.shaded.guava30.com.google.common.collect.Iterators;
import java.util.Iterator;
import java.util.Map;
+import static
org.apache.flink.table.store.CoreOptions.LOOKUP_CACHE_MAX_MEMORY_SIZE;
+
/**
* Base {@link FileStoreWrite} implementation which supports using shared
memory and preempting
* memory from other writers.
@@ -37,7 +40,9 @@ import java.util.Map;
* @param <T> type of record to write.
*/
public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T> {
- private final MemoryPoolFactory memoryPoolFactory;
+
+ private final MemoryPoolFactory writeBufferPool;
+ protected final CacheManager cacheManager;
public MemoryFileStoreWrite(
String commitUser,
@@ -47,7 +52,11 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
super(commitUser, snapshotManager, scan);
HeapMemorySegmentPool memoryPool =
new HeapMemorySegmentPool(options.writeBufferSize(),
options.pageSize());
- this.memoryPoolFactory = new MemoryPoolFactory(memoryPool,
this::memoryOwners);
+ this.writeBufferPool = new MemoryPoolFactory(memoryPool,
this::memoryOwners);
+ this.cacheManager =
+ new CacheManager(
+ options.pageSize(),
+
options.toConfiguration().get(LOOKUP_CACHE_MAX_MEMORY_SIZE));
}
private Iterator<MemoryOwner> memoryOwners() {
@@ -79,6 +88,6 @@ public abstract class MemoryFileStoreWrite<T> extends
AbstractFileStoreWrite<T>
+ " but this is: "
+ writer.getClass());
}
- memoryPoolFactory.notifyNewOwner((MemoryOwner) writer);
+ writeBufferPool.notifyNewOwner((MemoryOwner) writer);
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaValidation.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaValidation.java
index 4e6481d8..c4dc7878 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaValidation.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/SchemaValidation.java
@@ -85,12 +85,19 @@ public class SchemaValidation {
+ " should not be larger than "
+ SNAPSHOT_NUM_RETAINED_MAX.key());
- // Only changelog tables with primary keys support full compaction
- if (options.changelogProducer() ==
CoreOptions.ChangelogProducer.FULL_COMPACTION
- && options.writeMode() == WriteMode.CHANGE_LOG
- && schema.primaryKeys().isEmpty()) {
- throw new UnsupportedOperationException(
- "Changelog table with full compaction must have primary
keys");
+ // Only changelog tables with primary keys support full compaction or
lookup changelog
+ // producer
+ if (options.writeMode() == WriteMode.CHANGE_LOG) {
+ switch (options.changelogProducer()) {
+ case FULL_COMPACTION:
+ case LOOKUP:
+ if (schema.primaryKeys().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "Changelog table with full compaction must
have primary keys");
+ }
+ break;
+ default:
+ }
}
// Check column names in schema
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index db23df03..552f2f77 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -19,11 +19,13 @@
package org.apache.flink.table.store.table;
import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.CoreOptions.ChangelogProducer;
import org.apache.flink.table.store.data.InternalRow;
import org.apache.flink.table.store.file.KeyValue;
import org.apache.flink.table.store.file.KeyValueFileStore;
import org.apache.flink.table.store.file.WriteMode;
import
org.apache.flink.table.store.file.mergetree.compact.DeduplicateMergeFunction;
+import org.apache.flink.table.store.file.mergetree.compact.LookupMergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.MergeFunctionFactory;
import
org.apache.flink.table.store.file.mergetree.compact.PartialUpdateMergeFunction;
import
org.apache.flink.table.store.file.mergetree.compact.aggregate.AggregateMergeFunction;
@@ -77,7 +79,8 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
if (lazyStore == null) {
RowType rowType = tableSchema.logicalRowType();
Options conf = Options.fromMap(tableSchema.options());
- CoreOptions.MergeEngine mergeEngine =
conf.get(CoreOptions.MERGE_ENGINE);
+ CoreOptions options = new CoreOptions(conf);
+ CoreOptions.MergeEngine mergeEngine = options.mergeEngine();
MergeFunctionFactory<KeyValue> mfFactory;
switch (mergeEngine) {
case DEDUPLICATE:
@@ -102,7 +105,10 @@ public class ChangelogWithKeyFileStoreTable extends
AbstractFileStoreTable {
"Unsupported merge engine: " + mergeEngine);
}
- CoreOptions options = new CoreOptions(conf);
+ if (options.changelogProducer() == ChangelogProducer.LOOKUP) {
+ mfFactory = LookupMergeFunction.wrap(mfFactory);
+ }
+
KeyValueFieldsExtractor extractor =
ChangelogWithKeyKeyValueFieldsExtractor.EXTRACTOR;
lazyStore =
new KeyValueFileStore(
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
index b92ab4ec..46922c4b 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScan.java
@@ -29,8 +29,6 @@ import javax.annotation.Nullable;
import java.io.Serializable;
import java.util.HashMap;
-import static
org.apache.flink.table.store.CoreOptions.ChangelogProducer.FULL_COMPACTION;
-
/** {@link DataTableScan} for streaming planning. */
public interface StreamDataTableScan extends DataTableScan,
InnerStreamTableScan {
@@ -52,13 +50,16 @@ public interface StreamDataTableScan extends DataTableScan,
InnerStreamTableScan
put(CoreOptions.MergeEngine.AGGREGATE,
"Pre-aggregate");
}
};
- if (schema.primaryKeys().size() > 0
- && mergeEngineDesc.containsKey(mergeEngine)
- && options.changelogProducer() != FULL_COMPACTION) {
- throw new RuntimeException(
- mergeEngineDesc.get(mergeEngine)
- + " continuous reading is not supported. "
- + "You can use full compaction changelog producer
to support streaming reading.");
+ if (schema.primaryKeys().size() > 0 &&
mergeEngineDesc.containsKey(mergeEngine)) {
+ switch (options.changelogProducer()) {
+ case NONE:
+ case INPUT:
+ throw new RuntimeException(
+ mergeEngineDesc.get(mergeEngine)
+ + " continuous reading is not supported.
You can use "
+ + "'lookup' or 'full-compaction' changelog
producer to support streaming reading.");
+ default:
+ }
}
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
index de8d2f0b..c0e61caa 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/StreamDataTableScanImpl.java
@@ -149,18 +149,26 @@ public class StreamDataTableScanImpl extends
AbstractDataTableScan implements St
private FollowUpScanner createFollowUpScanner() {
CoreOptions.ChangelogProducer changelogProducer =
options.changelogProducer();
FollowUpScanner followUpScanner;
- if (changelogProducer == CoreOptions.ChangelogProducer.NONE) {
- followUpScanner = new DeltaFollowUpScanner();
- } else if (changelogProducer == CoreOptions.ChangelogProducer.INPUT) {
- followUpScanner = new InputChangelogFollowUpScanner();
- } else if (changelogProducer ==
CoreOptions.ChangelogProducer.FULL_COMPACTION) {
- // this change in data split reader will affect both starting
scanner and follow-up
- // scanner
- snapshotSplitReader.withLevelFilter(level -> level ==
options.numLevels() - 1);
- followUpScanner = new CompactionChangelogFollowUpScanner();
- } else {
- throw new UnsupportedOperationException(
- "Unknown changelog producer " + changelogProducer.name());
+ switch (changelogProducer) {
+ case NONE:
+ followUpScanner = new DeltaFollowUpScanner();
+ break;
+ case INPUT:
+ followUpScanner = new InputChangelogFollowUpScanner();
+ break;
+ case FULL_COMPACTION:
+ // this change in data split reader will affect both starting
scanner and follow-up
+ snapshotSplitReader.withLevelFilter(level -> level ==
options.numLevels() - 1);
+ followUpScanner = new CompactionChangelogFollowUpScanner();
+ break;
+ case LOOKUP:
+ // this change in data split reader will affect both starting
scanner and follow-up
+ snapshotSplitReader.withLevelFilter(level -> level > 0);
+ followUpScanner = new CompactionChangelogFollowUpScanner();
+ break;
+ default:
+ throw new UnsupportedOperationException(
+ "Unknown changelog producer " +
changelogProducer.name());
}
Long boundedWatermark = options.scanBoundedWatermark();
diff --git
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
new file mode 100644
index 00000000..d68d9e05
--- /dev/null
+++
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/mergetree/LookupLevelsTest.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.mergetree;
+
+import org.apache.flink.table.store.data.BinaryRow;
+import org.apache.flink.table.store.data.GenericRow;
+import org.apache.flink.table.store.data.InternalRow;
+import org.apache.flink.table.store.data.serializer.InternalRowSerializer;
+import org.apache.flink.table.store.file.KeyValue;
+import org.apache.flink.table.store.file.format.FlushingFileFormat;
+import org.apache.flink.table.store.file.io.DataFileMeta;
+import org.apache.flink.table.store.file.io.KeyValueFileReaderFactory;
+import org.apache.flink.table.store.file.io.KeyValueFileWriterFactory;
+import org.apache.flink.table.store.file.io.RollingFileWriter;
+import org.apache.flink.table.store.file.schema.KeyValueFieldsExtractor;
+import org.apache.flink.table.store.file.schema.SchemaManager;
+import org.apache.flink.table.store.file.schema.TableSchema;
+import org.apache.flink.table.store.file.utils.FileStorePathFactory;
+import org.apache.flink.table.store.fs.FileIOFinder;
+import org.apache.flink.table.store.fs.Path;
+import org.apache.flink.table.store.io.cache.CacheManager;
+import org.apache.flink.table.store.lookup.hash.HashLookupStoreFactory;
+import org.apache.flink.table.store.options.MemorySize;
+import org.apache.flink.table.store.table.SchemaEvolutionTableTestBase;
+import org.apache.flink.table.store.types.DataField;
+import org.apache.flink.table.store.types.DataTypes;
+import org.apache.flink.table.store.types.RowKind;
+import org.apache.flink.table.store.types.RowType;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static org.apache.flink.table.store.CoreOptions.TARGET_FILE_SIZE;
+import static org.apache.flink.table.store.file.io.DataFileTestUtils.row;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test {@link LookupLevels}. */
+public class LookupLevelsTest {
+
+ private static final String LOOKUP_FILE_PREFIX = "lookup-";
+
+ @TempDir java.nio.file.Path tempDir;
+
+ private final Comparator<InternalRow> comparator =
Comparator.comparingInt(o -> o.getInt(0));
+
+ private final RowType keyType = DataTypes.ROW(DataTypes.FIELD(0, "_key",
DataTypes.INT()));
+ private final RowType rowType =
+ DataTypes.ROW(
+ DataTypes.FIELD(0, "key", DataTypes.INT()),
+ DataTypes.FIELD(1, "value", DataTypes.INT()));
+
+ @Test
+ public void testMultiLevels() throws IOException {
+ Levels levels =
+ new Levels(
+ comparator,
+ Arrays.asList(
+ newFile(1, kv(1, 11), kv(3, 33), kv(5, 5)),
+ newFile(2, kv(2, 22), kv(5, 55))),
+ 3);
+ LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofMebiBytes(10));
+
+ // only in level 1
+ KeyValue kv = lookupLevels.lookup(row(1), 1);
+ assertThat(kv).isNotNull();
+ assertThat(kv.level()).isEqualTo(1);
+ assertThat(kv.value().getInt(1)).isEqualTo(11);
+
+ // only in level 2
+ kv = lookupLevels.lookup(row(2), 1);
+ assertThat(kv).isNotNull();
+ assertThat(kv.level()).isEqualTo(2);
+ assertThat(kv.value().getInt(1)).isEqualTo(22);
+
+ // both in level 1 and level 2
+ kv = lookupLevels.lookup(row(5), 1);
+ assertThat(kv).isNotNull();
+ assertThat(kv.level()).isEqualTo(1);
+ assertThat(kv.value().getInt(1)).isEqualTo(5);
+
+ // no exists
+ kv = lookupLevels.lookup(row(4), 1);
+ assertThat(kv).isNull();
+
+ lookupLevels.close();
+ assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testMultiFiles() throws IOException {
+ Levels levels =
+ new Levels(
+ comparator,
+ Arrays.asList(
+ newFile(1, kv(1, 11), kv(2, 22)),
+ newFile(1, kv(4, 44), kv(5, 55)),
+ newFile(1, kv(7, 77), kv(8, 88)),
+ newFile(1, kv(10, 1010), kv(11, 1111))),
+ 1);
+ LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofMebiBytes(10));
+
+ Map<Integer, Integer> contains =
+ new HashMap<Integer, Integer>() {
+ {
+ this.put(1, 11);
+ this.put(2, 22);
+ this.put(4, 44);
+ this.put(5, 55);
+ this.put(7, 77);
+ this.put(8, 88);
+ this.put(10, 1010);
+ this.put(11, 1111);
+ }
+ };
+ for (Map.Entry<Integer, Integer> entry : contains.entrySet()) {
+ KeyValue kv = lookupLevels.lookup(row(entry.getKey()), 1);
+ assertThat(kv).isNotNull();
+ assertThat(kv.level()).isEqualTo(1);
+ assertThat(kv.value().getInt(1)).isEqualTo(entry.getValue());
+ }
+
+ int[] notContains = new int[] {0, 3, 6, 9, 12};
+ for (int key : notContains) {
+ KeyValue kv = lookupLevels.lookup(row(key), 1);
+ assertThat(kv).isNull();
+ }
+
+ lookupLevels.close();
+ assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ }
+
+ @Test
+ public void testMaxDiskSize() throws IOException {
+ List<DataFileMeta> files = new ArrayList<>();
+ int fileNum = 10;
+ int recordInFile = 100;
+ for (int i = 0; i < fileNum; i++) {
+ List<KeyValue> kvs = new ArrayList<>();
+ for (int j = 0; j < recordInFile; j++) {
+ int key = i * recordInFile + j;
+ kvs.add(kv(key, key));
+ }
+ files.add(newFile(1, kvs.toArray(new KeyValue[0])));
+ }
+ Levels levels = new Levels(comparator, files, 1);
+ LookupLevels lookupLevels = createLookupLevels(levels,
MemorySize.ofKibiBytes(20));
+
+ for (int i = 0; i < fileNum * recordInFile; i++) {
+ KeyValue kv = lookupLevels.lookup(row(i), 1);
+ assertThat(kv).isNotNull();
+ assertThat(kv.level()).isEqualTo(1);
+ assertThat(kv.value().getInt(1)).isEqualTo(i);
+ }
+
+ // some files are invalided
+ long fileNumber = lookupLevels.lookupFiles().size();
+ String[] lookupFiles =
+ tempDir.toFile().list((dir, name) ->
name.startsWith(LOOKUP_FILE_PREFIX));
+ assertThat(lookupFiles).isNotNull();
+
assertThat(fileNumber).isNotEqualTo(fileNum).isEqualTo(lookupFiles.length);
+
+ lookupLevels.close();
+ assertThat(lookupLevels.lookupFiles().size()).isEqualTo(0);
+ }
+
+ private LookupLevels createLookupLevels(Levels levels, MemorySize
maxDiskSize) {
+ return new LookupLevels(
+ levels,
+ comparator,
+ new InternalRowSerializer(keyType),
+ new InternalRowSerializer(rowType),
+ file -> createReaderFactory().createRecordReader(0,
file.fileName(), file.level()),
+ () -> new File(tempDir.toFile(), LOOKUP_FILE_PREFIX +
UUID.randomUUID()),
+ new HashLookupStoreFactory(new CacheManager(2048,
MemorySize.ofMebiBytes(1)), 0.75),
+ Duration.ofHours(1),
+ maxDiskSize);
+ }
+
+ private KeyValue kv(int key, int value) {
+ return new KeyValue()
+ .replace(GenericRow.of(key), RowKind.INSERT,
GenericRow.of(key, value));
+ }
+
+ private DataFileMeta newFile(int level, KeyValue... records) throws
IOException {
+ RollingFileWriter<KeyValue, DataFileMeta> writer =
+ createWriterFactory().createRollingMergeTreeFileWriter(level);
+ for (KeyValue kv : records) {
+ writer.write(kv);
+ }
+ writer.close();
+ return writer.result().get(0);
+ }
+
+ private KeyValueFileWriterFactory createWriterFactory() {
+ Path path = new Path(tempDir.toUri().toString());
+ return KeyValueFileWriterFactory.builder(
+ FileIOFinder.find(path),
+ 0,
+ keyType,
+ rowType,
+ new FlushingFileFormat("avro"),
+ new FileStorePathFactory(path),
+ TARGET_FILE_SIZE.defaultValue().getBytes())
+ .build(BinaryRow.EMPTY_ROW, 0, null);
+ }
+
+ private KeyValueFileReaderFactory createReaderFactory() {
+ Path path = new Path(tempDir.toUri().toString());
+ KeyValueFileReaderFactory.Builder builder =
+ KeyValueFileReaderFactory.builder(
+ FileIOFinder.find(path),
+ createSchemaManager(path),
+ 0,
+ keyType,
+ rowType,
+ ignore -> new FlushingFileFormat("avro"),
+ new FileStorePathFactory(path),
+ new KeyValueFieldsExtractor() {
+ @Override
+ public List<DataField> keyFields(TableSchema
schema) {
+ return keyType.getFields();
+ }
+
+ @Override
+ public List<DataField> valueFields(TableSchema
schema) {
+ return schema.fields();
+ }
+ });
+ return builder.build(BinaryRow.EMPTY_ROW, 0);
+ }
+
+ private SchemaManager createSchemaManager(Path path) {
+ TableSchema tableSchema =
+ new TableSchema(
+ 0,
+ rowType.getFields(),
+ rowType.getFieldCount(),
+ Collections.emptyList(),
+ Collections.singletonList("key"),
+ Collections.emptyMap(),
+ "");
+ Map<Long, TableSchema> schemas = new HashMap<>();
+ schemas.put(tableSchema.id(), tableSchema);
+ return new SchemaEvolutionTableTestBase.TestingSchemaManager(path,
schemas);
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
index 722bebff..e07bef95 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/FlinkConnectorOptions.java
@@ -110,6 +110,17 @@ public class FlinkConnectorOptions {
+
CoreOptions.ChangelogProducer.FULL_COMPACTION.name()
+ ", full compaction will be constantly
triggered after this interval.");
+ public static final ConfigOption<Boolean> CHANGELOG_PRODUCER_LOOKUP_WAIT =
+ key("changelog-producer.lookup-wait")
+ .booleanType()
+ .defaultValue(true)
+ .withDescription(
+ "When "
+ + CoreOptions.CHANGELOG_PRODUCER.key()
+ + " is set to "
+ +
CoreOptions.ChangelogProducer.LOOKUP.name()
+ + ", commit will wait for changelog
generation by lookup.");
+
public static List<ConfigOption<?>> getOptions() {
final Field[] fields = FlinkConnectorOptions.class.getFields();
final List<ConfigOption<?>> list = new ArrayList<>(fields.length);
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
index c842183f..81e13809 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/FlinkSink.java
@@ -31,8 +31,8 @@ import
org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.table.data.RowData;
-import org.apache.flink.table.store.CoreOptions;
import
org.apache.flink.table.store.connector.utils.StreamExecutionEnvironmentUtils;
+import org.apache.flink.table.store.options.Options;
import org.apache.flink.table.store.table.FileStoreTable;
import org.apache.flink.table.store.utils.Preconditions;
import org.apache.flink.util.function.SerializableFunction;
@@ -41,6 +41,7 @@ import java.io.Serializable;
import java.util.UUID;
import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL;
+import static
org.apache.flink.table.store.connector.FlinkConnectorOptions.CHANGELOG_PRODUCER_LOOKUP_WAIT;
/** Abstract sink of table store. */
public abstract class FlinkSink implements Serializable {
@@ -59,26 +60,34 @@ public abstract class FlinkSink implements Serializable {
}
protected StoreSinkWrite.Provider createWriteProvider(String
initialCommitUser) {
- if (table.options().changelogProducer() ==
CoreOptions.ChangelogProducer.FULL_COMPACTION
- && !table.options().writeOnly()) {
- long fullCompactionThresholdMs =
- table.options()
- .toConfiguration()
-
.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)
- .toMillis();
- return (table, context, ioManager) ->
- new FullChangelogStoreSinkWrite(
- table,
- context,
- initialCommitUser,
- ioManager,
- isOverwrite,
- fullCompactionThresholdMs);
- } else {
- return (table, context, ioManager) ->
- new StoreSinkWriteImpl(
- table, context, initialCommitUser, ioManager,
isOverwrite);
+ if (!table.options().writeOnly()) {
+ Options options = table.options().toConfiguration();
+ switch (table.options().changelogProducer()) {
+ case FULL_COMPACTION:
+ long fullCompactionThresholdMs =
+
options.get(CHANGELOG_PRODUCER_FULL_COMPACTION_TRIGGER_INTERVAL)
+ .toMillis();
+ return (table, context, ioManager) ->
+ new FullChangelogStoreSinkWrite(
+ table,
+ context,
+ initialCommitUser,
+ ioManager,
+ isOverwrite,
+ fullCompactionThresholdMs);
+ case LOOKUP:
+ if (options.get(CHANGELOG_PRODUCER_LOOKUP_WAIT)) {
+ return (table, context, ioManager) ->
+ new LookupChangelogStoreSinkWrite(
+ table, context, initialCommitUser,
ioManager, isOverwrite);
+ }
+ break;
+ default:
+ }
}
+
+ return (table, context, ioManager) ->
+ new StoreSinkWriteImpl(table, context, initialCommitUser,
ioManager, isOverwrite);
}
public DataStreamSink<?> sinkFrom(DataStream<RowData> input) {
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/LookupChangelogStoreSinkWrite.java
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/LookupChangelogStoreSinkWrite.java
new file mode 100644
index 00000000..4c64d1d7
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/main/java/org/apache/flink/table/store/connector/sink/LookupChangelogStoreSinkWrite.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector.sink;
+
+import org.apache.flink.runtime.io.disk.iomanager.IOManager;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.table.store.CoreOptions;
+import org.apache.flink.table.store.table.FileStoreTable;
+
+import java.io.IOException;
+import java.util.List;
+
+/**
+ * {@link StoreSinkWrite} for {@link CoreOptions.ChangelogProducer#LOOKUP}
changelog producer. This
+ * writer will wait compaction in {@link #prepareCommit}.
+ */
+public class LookupChangelogStoreSinkWrite extends StoreSinkWriteImpl {
+
+ public LookupChangelogStoreSinkWrite(
+ FileStoreTable table,
+ StateInitializationContext context,
+ String initialCommitUser,
+ IOManager ioManager,
+ boolean isOverwrite)
+ throws Exception {
+ super(table, context, initialCommitUser, ioManager, isOverwrite);
+ }
+
+ @Override
+ public List<Committable> prepareCommit(boolean doCompaction, long
checkpointId)
+ throws IOException {
+ return super.prepareCommit(true, checkpointId);
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
index 6b03d548..fe52936b 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/CatalogITCaseBase.java
@@ -32,6 +32,7 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
import org.apache.flink.table.store.connector.util.AbstractTestBase;
import org.apache.flink.table.store.file.Snapshot;
+import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.file.utils.SnapshotManager;
import org.apache.flink.table.store.fs.Path;
import org.apache.flink.table.store.fs.local.LocalFileIO;
@@ -125,6 +126,10 @@ public abstract class CatalogITCaseBase extends
AbstractTestBase {
return sEnv.executeSql(String.format(query, args)).collect();
}
+ protected BlockingIterator<Row, Row> streamSqlBlockIter(String query,
Object... args) {
+ return BlockingIterator.of(sEnv.executeSql(String.format(query,
args)).collect());
+ }
+
protected CatalogTable table(String tableName) throws
TableNotExistException {
Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
CatalogBaseTable table =
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
index e57e6693..efedcf28 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/ChangelogWithKeyFileStoreTableITCase.java
@@ -43,6 +43,7 @@ import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -50,6 +51,7 @@ import java.util.UUID;
import java.util.concurrent.ThreadLocalRandom;
import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -95,79 +97,10 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
@Test
public void testFullCompactionTriggerInterval() throws Exception {
- TableEnvironment sEnv =
-
createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
- sEnv.getConfig()
- .getConfiguration()
-
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
-
- sEnv.executeSql(
- String.format(
- "CREATE CATALOG testCatalog WITH
('type'='table-store', 'warehouse'='%s/warehouse')",
- path));
- sEnv.executeSql("USE CATALOG testCatalog");
- sEnv.executeSql(
- "CREATE TABLE T ( k INT, v STRING, PRIMARY KEY (k) NOT
ENFORCED ) "
- + "WITH ( "
- + "'bucket' = '2', "
- + "'changelog-producer' = 'full-compaction', "
- + "'changelog-producer.compaction-interval' = '1s' )");
-
- Path inputPath = new Path(path, "input");
- sEnv.executeSql(
- "CREATE TABLE `default_catalog`.`default_database`.`S` ( i
INT, g STRING ) "
- + "WITH ( 'connector' = 'filesystem', 'format' =
'testcsv', 'path' = '"
- + inputPath
- + "', 'source.monitor-interval' = '500ms' )");
-
- sEnv.executeSql(
- "INSERT INTO T SELECT SUM(i) AS k, g AS v FROM
`default_catalog`.`default_database`.`S` GROUP BY g");
- CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
T").collect();
-
- // write initial data
- sEnv.executeSql(
- "INSERT INTO `default_catalog`.`default_database`.`S` "
- + "VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4,
'D')")
- .await();
-
- // read initial data
- List<String> actual = new ArrayList<>();
- for (int i = 0; i < 4; i++) {
- actual.add(it.next().toString());
- }
- actual.sort(String::compareTo);
-
- List<String> expected =
- new ArrayList<>(Arrays.asList("+I[1, A]", "+I[2, B]", "+I[3,
C]", "+I[4, D]"));
- expected.sort(String::compareTo);
- assertEquals(expected, actual);
-
- // write update data
- sEnv.executeSql(
- "INSERT INTO `default_catalog`.`default_database`.`S` "
- + "VALUES (1, 'A'), (1, 'B'), (1, 'C'), (1,
'D')")
- .await();
-
- // read update data
- actual.clear();
- for (int i = 0; i < 8; i++) {
- actual.add(it.next().toString());
- }
- actual.sort(String::compareTo);
-
- expected =
- new ArrayList<>(
- Arrays.asList(
- "-D[1, A]",
- "-U[2, B]",
- "+U[2, A]",
- "-U[3, C]",
- "+U[3, B]",
- "-U[4, D]",
- "+U[4, C]",
- "+I[5, D]"));
- expected.sort(String::compareTo);
- assertEquals(expected, actual);
+ innerTestChangelogProducing(
+ Arrays.asList(
+ "'changelog-producer' = 'full-compaction'",
+ "'changelog-producer.compaction-interval' = '1s'"));
}
@Test
@@ -233,6 +166,80 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
it.close();
}
+ @Test
+ public void testLookupChangelog() throws Exception {
+
innerTestChangelogProducing(Collections.singletonList("'changelog-producer' =
'lookup'"));
+ }
+
+ private void innerTestChangelogProducing(List<String> options) throws
Exception {
+ TableEnvironment sEnv =
+
createStreamingTableEnvironment(ThreadLocalRandom.current().nextInt(900) + 100);
+ sEnv.getConfig()
+ .getConfiguration()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 1);
+
+ sEnv.executeSql(
+ String.format(
+ "CREATE CATALOG testCatalog WITH
('type'='table-store', 'warehouse'='%s/warehouse')",
+ path));
+ sEnv.executeSql("USE CATALOG testCatalog");
+ sEnv.executeSql(
+ "CREATE TABLE T ( k INT, v STRING, PRIMARY KEY (k) NOT
ENFORCED ) "
+ + "WITH ( "
+ + "'bucket' = '2', "
+ + String.join(",", options)
+ + ")");
+
+ Path inputPath = new Path(path, "input");
+ sEnv.executeSql(
+ "CREATE TABLE `default_catalog`.`default_database`.`S` ( i
INT, g STRING ) "
+ + "WITH ( 'connector' = 'filesystem', 'format' =
'testcsv', 'path' = '"
+ + inputPath
+ + "', 'source.monitor-interval' = '500ms' )");
+
+ sEnv.executeSql(
+ "INSERT INTO T SELECT SUM(i) AS k, g AS v FROM
`default_catalog`.`default_database`.`S` GROUP BY g");
+ CloseableIterator<Row> it = sEnv.executeSql("SELECT * FROM
T").collect();
+
+ // write initial data
+ sEnv.executeSql(
+ "INSERT INTO `default_catalog`.`default_database`.`S` "
+ + "VALUES (1, 'A'), (2, 'B'), (3, 'C'), (4,
'D')")
+ .await();
+
+ // read initial data
+ List<String> actual = new ArrayList<>();
+ for (int i = 0; i < 4; i++) {
+ actual.add(it.next().toString());
+ }
+
+ assertThat(actual)
+ .containsExactlyInAnyOrder("+I[1, A]", "+I[2, B]", "+I[3, C]",
"+I[4, D]");
+
+ // write update data
+ sEnv.executeSql(
+ "INSERT INTO `default_catalog`.`default_database`.`S` "
+ + "VALUES (1, 'A'), (1, 'B'), (1, 'C'), (1,
'D')")
+ .await();
+
+ // read update data
+ actual.clear();
+ for (int i = 0; i < 8; i++) {
+ actual.add(it.next().toString());
+ }
+
+ assertThat(actual)
+ .containsExactlyInAnyOrder(
+ "-D[1, A]",
+ "-U[2, B]",
+ "+U[2, A]",
+ "-U[3, C]",
+ "+U[3, B]",
+ "-U[4, D]",
+ "+U[4, C]",
+ "+I[5, D]");
+ }
+
// ------------------------------------------------------------------------
// Random Tests
// ------------------------------------------------------------------------
@@ -275,6 +282,29 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
testStandAloneFullCompactJobRandom(sEnv, random.nextInt(1, 3),
random.nextBoolean());
}
+ @Test
+ @Timeout(600000)
+ public void testLookupChangelogProducerBatchRandom() throws Exception {
+ TableEnvironment bEnv = createBatchTableEnvironment();
+ testLookupChangelogProducerRandom(bEnv, 1, false);
+ }
+
+ @Test
+ @Timeout(600000)
+ public void testLookupChangelogProducerStreamingRandom() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ TableEnvironment sEnv =
createStreamingTableEnvironment(random.nextInt(900) + 100);
+ testLookupChangelogProducerRandom(sEnv, random.nextInt(1, 3),
random.nextBoolean());
+ }
+
+ @Test
+ @Timeout(600000)
+ public void testStandAloneLookupJobRandom() throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+ TableEnvironment sEnv =
createStreamingTableEnvironment(random.nextInt(900) + 100);
+ testStandAloneLookupJobRandom(sEnv, random.nextInt(1, 3),
random.nextBoolean());
+ }
+
private static final int NUM_PARTS = 4;
private static final int NUM_KEYS = 64;
private static final int NUM_VALUES = 1024;
@@ -308,7 +338,28 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
// if we can first read complete records then read incremental records
correctly
Thread.sleep(random.nextInt(5000));
- checkFullCompactionTestResult(numProducers);
+ checkChangelogTestResult(numProducers);
+ }
+
+ private void testLookupChangelogProducerRandom(
+ TableEnvironment tEnv, int numProducers, boolean enableFailure)
throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ testRandom(
+ tEnv,
+ numProducers,
+ enableFailure,
+ "'bucket' = '4',"
+ + String.format(
+ "'write-buffer-size' = '%s',",
+ random.nextBoolean() ? "512kb" : "1mb")
+ + "'changelog-producer' = 'lookup'");
+
+ // sleep for a random amount of time to check
+ // if we can first read complete records then read incremental records
correctly
+ Thread.sleep(random.nextInt(5000));
+
+ checkChangelogTestResult(numProducers);
}
private void testStandAloneFullCompactJobRandom(
@@ -343,10 +394,44 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
// if we can first read complete records then read incremental records
correctly
Thread.sleep(random.nextInt(2500));
- checkFullCompactionTestResult(numProducers);
+ checkChangelogTestResult(numProducers);
+ }
+
+ private void testStandAloneLookupJobRandom(
+ TableEnvironment tEnv, int numProducers, boolean enableConflicts)
throws Exception {
+ ThreadLocalRandom random = ThreadLocalRandom.current();
+
+ testRandom(
+ tEnv,
+ numProducers,
+ false,
+ "'bucket' = '4',"
+ + String.format(
+ "'write-buffer-size' = '%s',",
+ random.nextBoolean() ? "512kb" : "1mb")
+ + "'changelog-producer' = 'lookup',"
+ + "'write-only' = 'true'");
+
+ // sleep for a random amount of time to check
+ // if dedicated compactor job can find first snapshot to compact
correctly
+ Thread.sleep(random.nextInt(2500));
+
+ for (int i = enableConflicts ? 2 : 1; i > 0; i--) {
+ StreamExecutionEnvironment env =
+ createStreamExecutionEnvironment(random.nextInt(1900) +
100);
+ env.setParallelism(2);
+ FlinkActions.compact(path, "default", "T").build(env);
+ env.executeAsync();
+ }
+
+ // sleep for a random amount of time to check
+ // if we can first read complete records then read incremental records
correctly
+ Thread.sleep(random.nextInt(2500));
+
+ checkChangelogTestResult(numProducers);
}
- private void checkFullCompactionTestResult(int numProducers) throws
Exception {
+ private void checkChangelogTestResult(int numProducers) throws Exception {
TableEnvironment sEnv = createStreamingTableEnvironment(100);
sEnv.getConfig()
.getConfiguration()
@@ -436,7 +521,7 @@ public class ChangelogWithKeyFileStoreTableITCase extends
AbstractTestBase {
List<TableResult> results = new ArrayList<>();
if (enableFailure) {
- FailingFileIO.reset(failingName, 100, 1000);
+ FailingFileIO.reset(failingName, 100, 10000);
}
for (int i = 0; i < numProducers; i++) {
// for the last `NUM_PARTS * NUM_KEYS` records, we update every
key to a specific value
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupChangelogWithAggITCase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupChangelogWithAggITCase.java
new file mode 100644
index 00000000..a863e113
--- /dev/null
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupChangelogWithAggITCase.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.store.file.utils.BlockingIterator;
+import org.apache.flink.types.Row;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test Lookup changelog producer with aggregation tables. */
+public class LookupChangelogWithAggITCase extends CatalogITCaseBase {
+
+ @Test
+ public void testMultipleCompaction() throws Exception {
+ sql(
+ "CREATE TABLE T (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH ("
+ + "'bucket'='3', "
+ + "'changelog-producer'='lookup', "
+ + "'merge-engine'='aggregation', "
+ + "'fields.v.aggregate-function'='sum')");
+ BlockingIterator<Row, Row> iterator = streamSqlBlockIter("SELECT *
FROM T");
+
+ sql("INSERT INTO T VALUES (1, 1), (2, 2)");
+ assertThat(iterator.collect(2)).containsExactlyInAnyOrder(Row.of(1,
1), Row.of(2, 2));
+
+ for (int i = 1; i < 5; i++) {
+ sql("INSERT INTO T VALUES (1, 1), (2, 2)");
+ assertThat(iterator.collect(4))
+ .containsExactlyInAnyOrder(
+ Row.ofKind(RowKind.UPDATE_BEFORE, 1, i),
+ Row.ofKind(RowKind.UPDATE_BEFORE, 2, 2 * i),
+ Row.ofKind(RowKind.UPDATE_AFTER, 1, i + 1),
+ Row.ofKind(RowKind.UPDATE_AFTER, 2, 2 * (i + 1)));
+ }
+
+ iterator.close();
+ }
+}
diff --git
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
index 76023c8f..4fd36130 100644
---
a/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
+++
b/flink-table-store-flink/flink-table-store-flink-common/src/test/java/org/apache/flink/table/store/connector/LookupJoinITCase.java
@@ -466,7 +466,7 @@ public class LookupJoinITCase extends CatalogITCaseBase {
assertThatThrownBy(() -> sEnv.executeSql(query))
.hasRootCauseMessage(
"Partial update continuous reading is not supported. "
- + "You can use full compaction changelog
producer to support streaming reading.");
+ + "You can use 'lookup' or 'full-compaction'
changelog producer to support streaming reading.");
}
@Test