This is an automated email from the ASF dual-hosted git repository. wanglijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 41b35260bba91463bd9e44a4661beaa74c4cbe10 Author: Lijie Wang <[email protected]> AuthorDate: Mon Jul 17 14:22:34 2023 +0800 [FLINK-32493][table-runtime] Introduce GlobalRuntimeFilterBuilderOperator This closes #23004 --- .../GlobalRuntimeFilterBuilderOperator.java | 94 +++++++++++ .../GlobalRuntimeFilterBuilderOperatorTest.java | 173 +++++++++++++++++++++ 2 files changed, 267 insertions(+) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperator.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperator.java new file mode 100644 index 00000000000..f591df68f86 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperator.java @@ -0,0 +1,94 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.runtimefilter; + +import org.apache.flink.runtime.operators.util.BloomFilter; +import org.apache.flink.streaming.api.operators.BoundedOneInput; +import org.apache.flink.streaming.api.operators.OneInputStreamOperator; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.TableStreamOperator; +import org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils; +import org.apache.flink.table.runtime.util.StreamRecordCollector; +import org.apache.flink.util.Collector; + +import static org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils.OVER_MAX_ROW_COUNT; +import static org.apache.flink.util.Preconditions.checkArgument; + +/** Global runtime filter builder operator. */ +public class GlobalRuntimeFilterBuilderOperator extends TableStreamOperator<RowData> + implements OneInputStreamOperator<RowData, RowData>, BoundedOneInput { + + /** + * The maximum number of rows to build the bloom filter. Once the actual number of rows received + * is greater than this value, we will give up building the bloom filter and directly output an + * empty filter. + */ + private final int maxRowCount; + + private transient byte[] serializedGlobalFilter; + private transient Collector<RowData> collector; + private transient int globalRowCount; + + public GlobalRuntimeFilterBuilderOperator(int maxRowCount) { + checkArgument(maxRowCount > 0); + this.maxRowCount = maxRowCount; + } + + @Override + public void open() throws Exception { + super.open(); + this.serializedGlobalFilter = null; + this.collector = new StreamRecordCollector<>(output); + this.globalRowCount = 0; + } + + @Override + public void processElement(StreamRecord<RowData> element) throws Exception { + RowData rowData = element.getValue(); + int localRowCount = rowData.getInt(0); + + if (globalRowCount == OVER_MAX_ROW_COUNT) { + // Current global filter is already over-max-row-count, do nothing. + } else if (localRowCount == OVER_MAX_ROW_COUNT + || globalRowCount + localRowCount > maxRowCount) { + // The received local filter is over-max-row-count, mark the global filter as + // over-max-row-count. + globalRowCount = OVER_MAX_ROW_COUNT; + serializedGlobalFilter = null; + } else { + // merge the local filter + byte[] serializedLocalFilter = rowData.getBinary(1); + if (serializedGlobalFilter == null) { + serializedGlobalFilter = serializedLocalFilter.clone(); + } else { + BloomFilter.mergeSerializedBloomFilters( + serializedGlobalFilter, serializedLocalFilter); + } + globalRowCount += localRowCount; + } + } + + @Override + public void endInput() throws Exception { + collector.collect( + RuntimeFilterUtils.convertBloomFilterToRowData( + globalRowCount, serializedGlobalFilter)); + } +} diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperatorTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperatorTest.java new file mode 100644 index 00000000000..002de3b7d9d --- /dev/null +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/runtimefilter/GlobalRuntimeFilterBuilderOperatorTest.java @@ -0,0 +1,173 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.operators.runtimefilter; + +import org.apache.flink.runtime.io.network.api.EndOfData; +import org.apache.flink.runtime.io.network.api.StopMode; +import org.apache.flink.runtime.operators.util.BloomFilter; +import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import org.apache.flink.streaming.runtime.tasks.OneInputStreamTask; +import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarness; +import org.apache.flink.streaming.runtime.tasks.StreamTaskMailboxTestHarnessBuilder; +import org.apache.flink.table.data.GenericRowData; +import org.apache.flink.table.data.RowData; +import org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils; +import org.apache.flink.table.runtime.typeutils.InternalTypeInfo; +import org.apache.flink.table.types.logical.BinaryType; +import org.apache.flink.table.types.logical.IntType; + +import org.junit.jupiter.api.Test; + +import java.util.Queue; + +import static org.apache.flink.table.runtime.operators.runtimefilter.util.RuntimeFilterUtils.OVER_MAX_ROW_COUNT; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link GlobalRuntimeFilterBuilderOperator}. */ +class GlobalRuntimeFilterBuilderOperatorTest { + + @Test + void testNormalInputAndNormalOutput() throws Exception { + try (StreamTaskMailboxTestHarness<RowData> testHarness = + createGlobalRuntimeFilterBuilderOperatorHarness(10)) { + // process elements + testHarness.processElement( + new StreamRecord<RowData>( + GenericRowData.of(5, BloomFilter.toBytes(createBloomFilter1())))); + testHarness.processElement( + new StreamRecord<RowData>( + GenericRowData.of(5, BloomFilter.toBytes(createBloomFilter2())))); + testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0); + + // test the output + Queue<Object> outputs = testHarness.getOutput(); + assertThat(outputs.size()).isEqualTo(1); + + RowData outputRowData = ((StreamRecord<RowData>) outputs.poll()).getValue(); + assertThat(outputRowData.getArity()).isEqualTo(2); + + int globalCount = outputRowData.getInt(0); + BloomFilter globalBloomFilter = BloomFilter.fromBytes(outputRowData.getBinary(1)); + assertThat(globalCount).isEqualTo(10); + assertThat(globalBloomFilter.testHash("var1".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var2".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var3".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var4".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var5".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var6".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var7".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var8".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var9".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var10".hashCode())).isTrue(); + assertThat(globalBloomFilter.testHash("var11".hashCode())).isFalse(); + assertThat(globalBloomFilter.testHash("var12".hashCode())).isFalse(); + assertThat(globalBloomFilter.testHash("var13".hashCode())).isFalse(); + assertThat(globalBloomFilter.testHash("var14".hashCode())).isFalse(); + assertThat(globalBloomFilter.testHash("var15".hashCode())).isFalse(); + } + } + + /** + * Test the case that all input local runtime filters are normal, but the merged global filter + * is over-max-row-count. + */ + @Test + void testNormalInputAndOverMaxRowCountOutput() throws Exception { + try (StreamTaskMailboxTestHarness<RowData> testHarness = + createGlobalRuntimeFilterBuilderOperatorHarness(9)) { + // process elements + testHarness.processElement( + new StreamRecord<RowData>( + GenericRowData.of(5, BloomFilter.toBytes(createBloomFilter1())))); + testHarness.processElement( + new StreamRecord<RowData>( + GenericRowData.of(5, BloomFilter.toBytes(createBloomFilter2())))); + testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0); + + // test the output + Queue<Object> outputs = testHarness.getOutput(); + assertThat(outputs.size()).isEqualTo(1); + + RowData outputRowData = ((StreamRecord<RowData>) outputs.poll()).getValue(); + assertThat(outputRowData.getArity()).isEqualTo(2); + + int globalCount = outputRowData.getInt(0); + assertThat(globalCount).isEqualTo(OVER_MAX_ROW_COUNT); + assertThat(outputRowData.isNullAt(1)).isTrue(); + } + } + + /** Test the case that one of the input local runtime filters is over-max-row-count. */ + @Test + void testOverMaxRowCountInput() throws Exception { + try (StreamTaskMailboxTestHarness<RowData> testHarness = + createGlobalRuntimeFilterBuilderOperatorHarness(10)) { + // process elements + testHarness.processElement( + new StreamRecord<RowData>( + GenericRowData.of(5, BloomFilter.toBytes(createBloomFilter1())))); + testHarness.processElement( + new StreamRecord<RowData>(GenericRowData.of(OVER_MAX_ROW_COUNT, null))); + testHarness.processEvent(new EndOfData(StopMode.DRAIN), 0); + + // test the output + Queue<Object> outputs = testHarness.getOutput(); + assertThat(outputs.size()).isEqualTo(1); + + RowData outputRowData = ((StreamRecord<RowData>) outputs.poll()).getValue(); + assertThat(outputRowData.getArity()).isEqualTo(2); + + int globalCount = outputRowData.getInt(0); + assertThat(globalCount).isEqualTo(OVER_MAX_ROW_COUNT); + assertThat(outputRowData.isNullAt(1)).isTrue(); + } + } + + private static BloomFilter createBloomFilter1() { + final BloomFilter bloomFilter1 = RuntimeFilterUtils.createOnHeapBloomFilter(10, 0.05); + bloomFilter1.addHash("var1".hashCode()); + bloomFilter1.addHash("var2".hashCode()); + bloomFilter1.addHash("var3".hashCode()); + bloomFilter1.addHash("var4".hashCode()); + bloomFilter1.addHash("var5".hashCode()); + return bloomFilter1; + } + + private static BloomFilter createBloomFilter2() { + final BloomFilter bloomFilter2 = RuntimeFilterUtils.createOnHeapBloomFilter(10, 0.05); + bloomFilter2.addHash("var6".hashCode()); + bloomFilter2.addHash("var7".hashCode()); + bloomFilter2.addHash("var8".hashCode()); + bloomFilter2.addHash("var9".hashCode()); + bloomFilter2.addHash("var10".hashCode()); + return bloomFilter2; + } + + private static StreamTaskMailboxTestHarness<RowData> + createGlobalRuntimeFilterBuilderOperatorHarness(int maxRowCount) throws Exception { + final GlobalRuntimeFilterBuilderOperator operator = + new GlobalRuntimeFilterBuilderOperator(maxRowCount); + return new StreamTaskMailboxTestHarnessBuilder<>( + OneInputStreamTask::new, + InternalTypeInfo.ofFields(new IntType(), new BinaryType())) + .setupOutputForSingletonOperatorChain(operator) + .addInput(InternalTypeInfo.ofFields(new IntType(), new BinaryType())) + .build(); + } +}
