This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 37683e67c [flink] Introduce Query Service (#2709)
37683e67c is described below
commit 37683e67cf05e33cc4cb7debc75a425ab17ff856
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jan 17 10:08:00 2024 +0800
[flink] Introduce Query Service (#2709)
---
docs/content/how-to/lookup-joins.md | 44 ++++---
docs/content/how-to/writing-tables.md | 2 +
docs/content/maintenance/configurations.md | 8 +-
.../apache/paimon/table/query/LocalTableQuery.java | 4 +-
.../flink/action/QueryServiceActionFactory.java | 70 ++++++++++
.../paimon/flink/procedure/ProcedureBase.java | 12 +-
.../flink/procedure/QueryServiceProcedure.java | 51 +++++++
.../paimon/flink/query/RemoteTableQuery.java | 13 +-
.../paimon/flink/service/QueryAddressRegister.java | 76 +++++++++++
.../flink/service/QueryExecutorOperator.java | 126 ++++++++++++++++++
.../paimon/flink/service/QueryFileMonitor.java | 146 +++++++++++++++++++++
.../apache/paimon/flink/service/QueryService.java | 68 ++++++++++
.../services/org.apache.paimon.factories.Factory | 2 +
.../paimon/flink/RemoteLookupJoinITCase.java | 34 +++++
.../paimon/service/network/NetworkUtils.java | 68 ++++++++++
15 files changed, 698 insertions(+), 26 deletions(-)
diff --git a/docs/content/how-to/lookup-joins.md
b/docs/content/how-to/lookup-joins.md
index a0c8072fb..34973bdc1 100644
--- a/docs/content/how-to/lookup-joins.md
+++ b/docs/content/how-to/lookup-joins.md
@@ -30,7 +30,7 @@ under the License.
Paimon supports lookup joins on tables with primary keys and append tables in
Flink. The following example illustrates this feature.
-### Prepare
+## Prepare
First, let's create a Paimon table and update it in real-time.
@@ -69,7 +69,7 @@ CREATE TEMPORARY TABLE Orders (
);
```
-### Normal Lookup
+## Normal Lookup
You can now use `customers` in a lookup join query.
@@ -82,7 +82,7 @@ FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```
-### Retry Lookup
+## Retry Lookup
If the records of `Orders` (main table) join missing because the corresponding
data of `customers` (lookup table) is not ready.
You can consider using Flink's [Delayed Retry Strategy For
Lookup](https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/table/sql/queries/hints/#3-enable-delayed-retry-strategy-for-lookup).
@@ -98,7 +98,7 @@ FOR SYSTEM_TIME AS OF o.proc_time AS c
ON o.customer_id = c.id;
```
-### Async Retry Lookup
+## Async Retry Lookup
The problem with synchronous retry is that one record will block subsequent
records, causing the entire job to be blocked.
You can consider using async + allow_unordered to avoid blocking, the records
that join missing will no longer block
@@ -120,22 +120,34 @@ your streaming job may be blocked. You can try to use
`audit_log` system table f
(convert CDC stream to append stream).
{{< /hint >}}
-### Performance
+## Query Service
-The lookup join operator will maintain a RocksDB cache locally and pull the
latest updates of the table in real time. Lookup join operator will only pull
the necessary data, so your filter conditions are very important for
performance.
+You can run a Flink Streaming Job to start query service for the table. When
QueryService exists, Flink Lookup Join
+will prioritize obtaining data from it, which will effectively improve query
performance.
-This feature is only suitable for tables containing at most tens of millions
of records to avoid excessive use of local disks.
+{{< tabs "query-service" >}}
-## RocksDB Cache Options
-
-The following options allow users to finely adjust RocksDB for better
performance. You can either specify them in table properties or in dynamic
table hints.
+{{< tab "Flink SQL" >}}
```sql
--- dynamic table hints example
-SELECT o.order_id, o.total, c.country, c.zip
-FROM Orders AS o JOIN customers /*+ OPTIONS('lookup.cache-rows'='20000') */
-FOR SYSTEM_TIME AS OF o.proc_time AS c
-ON o.customer_id = c.id;
+CALL sys.query_service('database_name.table_name', parallelism);
```
-{{< generated/rocksdb_configuration >}}
+{{< /tab >}}
+
+{{< tab "Flink Action" >}}
+
+```bash
+<FLINK_HOME>/bin/flink run \
+ /path/to/paimon-flink-action-{{< version >}}.jar \
+ query_service \
+ --warehouse <warehouse-path> \
+ --database <database-name> \
+ --table <table-name> \
+ [--parallelism <parallelism>] \
+ [--catalog_conf <paimon-catalog-conf> [--catalog_conf
<paimon-catalog-conf> ...]]
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
diff --git a/docs/content/how-to/writing-tables.md
b/docs/content/how-to/writing-tables.md
index c47f9d97a..d42c1ce83 100644
--- a/docs/content/how-to/writing-tables.md
+++ b/docs/content/how-to/writing-tables.md
@@ -128,9 +128,11 @@ INSERT INTO MyTable SELECT ...
{{< /tabs >}}
## Overwriting
+
Note :If `spark.sql.sources.partitionOverwriteMode` is set to `dynamic` by
default in Spark,
in order to ensure that the insert overwrite function of the Paimon table can
be used normally,
`spark.sql.extensions` should be set to
`org.apache.paimon.spark.extensions.PaimonSparkSessionExtensions`.
+
### Overwriting the Whole Table
For unpartitioned tables, Paimon supports overwriting the whole table.
diff --git a/docs/content/maintenance/configurations.md
b/docs/content/maintenance/configurations.md
index a6b967a65..f48956462 100644
--- a/docs/content/maintenance/configurations.md
+++ b/docs/content/maintenance/configurations.md
@@ -66,4 +66,10 @@ Flink connector options for paimon.
Spark connector options for paimon.
-{{< generated/spark_connector_configuration >}}
\ No newline at end of file
+{{< generated/spark_connector_configuration >}}
+
+## RocksDB Options
+
+The following options allow users to finely adjust RocksDB for better
performance. You can either specify them in table properties or in dynamic
table hints.
+
+{{< generated/rocksdb_configuration >}}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 2ce70332a..1ba0c698a 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -147,9 +147,11 @@ public class LocalTableQuery implements TableQuery {
tableView.computeIfAbsent(partition, k -> new HashMap<>()).put(bucket,
lookupLevels);
}
+ /** TODO remove synchronized and supports multiple thread to lookup. */
@Nullable
@Override
- public InternalRow lookup(BinaryRow partition, int bucket, InternalRow
key) throws IOException {
+ public synchronized InternalRow lookup(BinaryRow partition, int bucket,
InternalRow key)
+ throws IOException {
Map<Integer, LookupLevels> buckets = tableView.get(partition);
if (buckets == null || buckets.isEmpty()) {
return null;
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java
new file mode 100644
index 000000000..fecbc91e6
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/QueryServiceActionFactory.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.action;
+
+import org.apache.paimon.flink.service.QueryService;
+
+import org.apache.flink.api.java.tuple.Tuple3;
+
+import java.util.Map;
+import java.util.Optional;
+
+/** Factory to create QueryService Action. */
+public class QueryServiceActionFactory implements ActionFactory {
+
+ public static final String IDENTIFIER = "query_service";
+
+ public static final String PARALLELISM = "parallelism";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ @Override
+ public Optional<Action> create(MultipleParameterToolAdapter params) {
+ Tuple3<String, String, String> tablePath = getTablePath(params);
+ Map<String, String> catalogConfig = optionalConfigMap(params,
CATALOG_CONF);
+ Map<String, String> tableConfig = optionalConfigMap(params,
TABLE_CONF);
+ String parallStr = params.get(PARALLELISM);
+ int parallelism = parallStr == null ? 1 : Integer.parseInt(parallStr);
+ Action action =
+ new TableActionBase(tablePath.f0, tablePath.f1, tablePath.f2,
catalogConfig) {
+ @Override
+ public void run() throws Exception {
+ QueryService.build(env, table.copy(tableConfig),
parallelism);
+ execute("Query Service job");
+ }
+ };
+ return Optional.of(action);
+ }
+
+ @Override
+ public void printHelp() {
+ System.out.println(
+ "Action \"query-service\" runs a dedicated job starting query
service for a table.");
+ System.out.println();
+
+ System.out.println("Syntax:");
+ System.out.println(
+ " query-service --warehouse <warehouse-path> --database
<database-name> --table <table-name> --parallelism <parallelism>"
+ + "[--catalog_conf <key>=<value> [--catalog_conf
<key>=<value> ...]] "
+ + "[--table_conf <key>=<value> [--table_conf
<key>=<value> ...]] ");
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
index 5f71d53c2..fed5c43ac 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/ProcedureBase.java
@@ -58,10 +58,7 @@ public abstract class ProcedureBase implements Procedure,
Factory {
action.withStreamExecutionEnvironment(env);
action.build();
- ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
- String name =
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
- JobClient jobClient = env.executeAsync(name);
- return execute(jobClient, conf.get(TABLE_DML_SYNC));
+ return execute(env, defaultJobName);
}
protected String[] execute(ProcedureContext procedureContext, JobClient
jobClient) {
@@ -70,6 +67,13 @@ public abstract class ProcedureBase implements Procedure,
Factory {
return execute(jobClient, conf.get(TABLE_DML_SYNC));
}
+ protected String[] execute(StreamExecutionEnvironment env, String
defaultJobName)
+ throws Exception {
+ ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ String name =
conf.getOptional(PipelineOptions.NAME).orElse(defaultJobName);
+ return execute(env.executeAsync(name), conf.get(TABLE_DML_SYNC));
+ }
+
private String[] execute(JobClient jobClient, boolean dmlSync) {
String jobId = jobClient.getJobID().toString();
if (dmlSync) {
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
new file mode 100644
index 000000000..2f236bb4f
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/procedure/QueryServiceProcedure.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.procedure;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.flink.service.QueryService;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.table.procedure.ProcedureContext;
+
+/**
+ * Query Service procedure. Usage:
+ *
+ * <pre><code>
+ * CALL sys.query_service('tableId', 'parallelism')
+ * </code></pre>
+ */
+public class QueryServiceProcedure extends ProcedureBase {
+
+ public static final String IDENTIFIER = "query_service";
+
+ @Override
+ public String identifier() {
+ return IDENTIFIER;
+ }
+
+ public String[] call(ProcedureContext procedureContext, String tableId,
int parallelism)
+ throws Exception {
+ Table table = catalog.getTable(Identifier.fromString(tableId));
+ StreamExecutionEnvironment env =
procedureContext.getExecutionEnvironment();
+ QueryService.build(env, table, parallelism);
+ return execute(env, IDENTIFIER);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
index 3a821b5e3..2599b4f11 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/query/RemoteTableQuery.java
@@ -28,6 +28,7 @@ import org.apache.paimon.query.QueryLocationImpl;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.service.client.KvQueryClient;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
import org.apache.paimon.table.query.TableQuery;
import org.apache.paimon.utils.ProjectedRow;
import org.apache.paimon.utils.Projection;
@@ -47,11 +48,11 @@ public class RemoteTableQuery implements TableQuery {
private final KvQueryClient client;
private final InternalRowSerializer keySerializer;
- private int[] projection;
+ @Nullable private int[] projection;
- public RemoteTableQuery(FileStoreTable table) {
- this.table = table;
- ServiceManager manager = table.store().newServiceManager();
+ public RemoteTableQuery(Table table) {
+ this.table = (FileStoreTable) table;
+ ServiceManager manager = this.table.store().newServiceManager();
this.client = new KvQueryClient(new QueryLocationImpl(manager), 1);
this.keySerializer =
InternalSerializers.create(TypeUtils.project(table.rowType(),
table.primaryKeys()));
@@ -79,6 +80,10 @@ public class RemoteTableQuery implements TableQuery {
throw new IOException(e.getCause());
}
+ if (projection == null) {
+ return row;
+ }
+
if (row == null) {
return null;
}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
new file mode 100644
index 000000000..ed883d9eb
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryAddressRegister.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.service;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.service.ServiceManager;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
+import org.apache.flink.streaming.api.functions.sink.SinkFunction;
+
+import java.net.InetSocketAddress;
+import java.util.TreeMap;
+
+import static org.apache.paimon.service.ServiceManager.PRIMARY_KEY_LOOKUP;
+
+/** Operator for address server to register addresses to {@link
ServiceManager}. */
+public class QueryAddressRegister extends RichSinkFunction<InternalRow> {
+
+ private final Table table;
+
+ private transient int numberExecutors;
+ private transient TreeMap<Integer, InetSocketAddress> executors;
+
+ public QueryAddressRegister(Table table) {
+ this.table = table;
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ this.executors = new TreeMap<>();
+ }
+
+ @Override
+ public void invoke(InternalRow row, SinkFunction.Context context) {
+ int numberExecutors = row.getInt(0);
+ if (this.numberExecutors != 0 && this.numberExecutors !=
numberExecutors) {
+ throw new IllegalArgumentException(
+ String.format(
+ "Number Executors can not be changed! Old %s , New
%s .",
+ this.numberExecutors, numberExecutors));
+ }
+ this.numberExecutors = numberExecutors;
+
+ int executorId = row.getInt(1);
+ String hostname = row.getString(2).toString();
+ int port = row.getInt(3);
+
+ executors.put(executorId, new InetSocketAddress(hostname, port));
+
+ if (executors.size() == numberExecutors) {
+ FileStoreTable storeTable = (FileStoreTable) table;
+ ServiceManager manager = storeTable.store().newServiceManager();
+ manager.resetService(
+ PRIMARY_KEY_LOOKUP, executors.values().toArray(new
InetSocketAddress[0]));
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
new file mode 100644
index 000000000..556c30839
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryExecutorOperator.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.service;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.io.DataFileMetaSerializer;
+import org.apache.paimon.service.network.NetworkUtils;
+import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
+import org.apache.paimon.service.server.KvQueryServer;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.query.LocalTableQuery;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+
+import java.net.InetSocketAddress;
+import java.util.Collections;
+import java.util.List;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+
+/** Operator for query executor. */
+public class QueryExecutorOperator extends AbstractStreamOperator<InternalRow>
+ implements OneInputStreamOperator<InternalRow, InternalRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Table table;
+
+ private transient LocalTableQuery query;
+
+ private transient IOManager ioManager;
+
+ public QueryExecutorOperator(Table table) {
+ this.table = table;
+ }
+
+ public static RowType outputType() {
+ return RowType.of(DataTypes.INT(), DataTypes.INT(),
DataTypes.STRING(), DataTypes.INT());
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
+ super.initializeState(context);
+ this.ioManager =
+ IOManager.create(
+ getContainingTask()
+ .getEnvironment()
+ .getIOManager()
+ .getSpillingDirectoriesPaths());
+ this.query = ((FileStoreTable)
table).newLocalTableQuery().withIOManager(ioManager);
+ KvQueryServer server =
+ new KvQueryServer(
+ getRuntimeContext().getIndexOfThisSubtask(),
+ getRuntimeContext().getNumberOfParallelSubtasks(),
+ NetworkUtils.findHostAddress(),
+ Collections.singletonList(0).iterator(),
+ 1,
+ 1,
+ query,
+ new DisabledServiceRequestStats());
+
+ try {
+ server.start();
+ } catch (Throwable e) {
+ throw new RuntimeException(e);
+ }
+
+ InetSocketAddress address = server.getServerAddress();
+ this.output.collect(
+ new StreamRecord<>(
+ GenericRow.of(
+
getRuntimeContext().getNumberOfParallelSubtasks(),
+ getRuntimeContext().getIndexOfThisSubtask(),
+ BinaryString.fromString(address.getHostName()),
+ address.getPort())));
+ }
+
+ @Override
+ public void processElement(StreamRecord<InternalRow> streamRecord) throws
Exception {
+ InternalRow row = streamRecord.getValue();
+ BinaryRow partition = deserializeBinaryRow(row.getBinary(1));
+ int bucket = row.getInt(2);
+ DataFileMetaSerializer fileMetaSerializer = new
DataFileMetaSerializer();
+ List<DataFileMeta> beforeFiles =
fileMetaSerializer.deserializeList(row.getBinary(3));
+ List<DataFileMeta> dataFiles =
fileMetaSerializer.deserializeList(row.getBinary(4));
+ query.refreshFiles(partition, bucket, beforeFiles, dataFiles);
+ }
+
+ @Override
+ public void close() throws Exception {
+ super.close();
+ if (query != null) {
+ query.close();
+ }
+ if (ioManager != null) {
+ ioManager.close();
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
new file mode 100644
index 000000000..4cd34ab26
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryFileMonitor.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.service;
+
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.options.Options;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.table.sink.ChannelComputer;
+import org.apache.paimon.table.source.ReadBuilder;
+import org.apache.paimon.table.source.StreamTableScan;
+import org.apache.paimon.table.source.TableRead;
+import org.apache.paimon.table.system.FileMonitorTable;
+
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+
+/**
+ * This is the single (non-parallel) monitoring task, it is responsible for:
+ *
+ * <ol>
+ * <li>Read incremental files from tables.
+ * <li>Assigning them to downstream tasks for further processing.
+ * </ol>
+ */
+public class QueryFileMonitor extends RichSourceFunction<InternalRow> {
+
+ private static final long serialVersionUID = 1L;
+
+ private final Table table;
+ private final long monitorInterval;
+
+ private transient SourceContext<InternalRow> ctx;
+ private transient StreamTableScan scan;
+ private transient TableRead read;
+
+ private volatile boolean isRunning = true;
+
+ public QueryFileMonitor(Table table) {
+ this.table = table;
+ this.monitorInterval =
+ Options.fromMap(table.options())
+ .get(CoreOptions.CONTINUOUS_DISCOVERY_INTERVAL)
+ .toMillis();
+ }
+
+ @Override
+ public void open(Configuration parameters) throws Exception {
+ FileMonitorTable monitorTable = new FileMonitorTable((FileStoreTable)
table);
+ ReadBuilder readBuilder = monitorTable.newReadBuilder();
+ this.scan = readBuilder.newStreamScan();
+ this.read = readBuilder.newRead();
+ }
+
+ @Override
+ public void run(SourceContext<InternalRow> ctx) throws Exception {
+ this.ctx = ctx;
+ while (isRunning) {
+ boolean isEmpty;
+ synchronized (ctx.getCheckpointLock()) {
+ if (!isRunning) {
+ return;
+ }
+ isEmpty = doScan();
+ }
+
+ if (isEmpty) {
+ Thread.sleep(monitorInterval);
+ }
+ }
+ }
+
+ private boolean doScan() throws Exception {
+ List<InternalRow> records = new ArrayList<>();
+ read.createReader(scan.plan()).forEachRemaining(records::add);
+ records.forEach(ctx::collect);
+ return records.isEmpty();
+ }
+
+ @Override
+ public void cancel() {
+ // this is to cover the case where cancel() is called before the run()
+ if (ctx != null) {
+ synchronized (ctx.getCheckpointLock()) {
+ isRunning = false;
+ }
+ } else {
+ isRunning = false;
+ }
+ }
+
+ public static DataStream<InternalRow> build(StreamExecutionEnvironment
env, Table table) {
+ return env.addSource(
+ new QueryFileMonitor(table),
+ "FileMonitor-" + table.name(),
+ InternalTypeInfo.fromRowType(FileMonitorTable.getRowType()));
+ }
+
+ public static ChannelComputer<InternalRow> createChannelComputer() {
+ return new FileMonitorChannelComputer();
+ }
+
+ /** A {@link ChannelComputer} to handle rows from {@link
FileMonitorTable}. */
+ private static class FileMonitorChannelComputer implements
ChannelComputer<InternalRow> {
+
+ private int numChannels;
+
+ @Override
+ public void setup(int numChannels) {
+ this.numChannels = numChannels;
+ }
+
+ @Override
+ public int channel(InternalRow row) {
+ BinaryRow partition = deserializeBinaryRow(row.getBinary(1));
+ int bucket = row.getInt(2);
+ return ChannelComputer.select(partition, bucket, numChannels);
+ }
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
new file mode 100644
index 000000000..5b2c13c84
--- /dev/null
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/service/QueryService.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.service;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.flink.utils.InternalTypeInfo;
+import org.apache.paimon.flink.utils.StreamExecutionEnvironmentUtils;
+import org.apache.paimon.table.BucketMode;
+import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.table.Table;
+import org.apache.paimon.utils.Preconditions;
+
+import org.apache.flink.api.common.RuntimeExecutionMode;
+import org.apache.flink.configuration.ExecutionOptions;
+import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+
+import static org.apache.paimon.flink.sink.FlinkStreamPartitioner.partition;
+
+/** A class to build Query Service topology. */
+public class QueryService {
+
+ public static void build(StreamExecutionEnvironment env, Table table, int
parallelism) {
+ ReadableConfig conf =
StreamExecutionEnvironmentUtils.getConfiguration(env);
+ Preconditions.checkArgument(
+ conf.get(ExecutionOptions.RUNTIME_MODE) ==
RuntimeExecutionMode.STREAMING,
+ "Query Service only supports streaming mode.");
+
+ FileStoreTable storeTable = (FileStoreTable) table;
+ if (storeTable.bucketMode() != BucketMode.FIXED
+ || storeTable.schema().primaryKeys().isEmpty()) {
+ throw new UnsupportedOperationException(
+ "The bucket mode of "
+ + table.name()
+ + " is not fixed or the table has no primary
key.");
+ }
+
+ DataStream<InternalRow> stream = QueryFileMonitor.build(env, table);
+ stream = partition(stream, QueryFileMonitor.createChannelComputer(),
parallelism);
+
+ QueryExecutorOperator executorOperator = new
QueryExecutorOperator(table);
+ stream.transform(
+ "Executor",
+
InternalTypeInfo.fromRowType(QueryExecutorOperator.outputType()),
+ executorOperator)
+ .setParallelism(parallelism)
+ .addSink(new QueryAddressRegister(table))
+ .setParallelism(1)
+ .setMaxParallelism(1);
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
index fb483b73c..51c0f1ad1 100644
---
a/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
+++
b/paimon-flink/paimon-flink-common/src/main/resources/META-INF/services/org.apache.paimon.factories.Factory
@@ -25,6 +25,7 @@ org.apache.paimon.flink.action.DeleteTagActionFactory
org.apache.paimon.flink.action.ResetConsumerActionFactory
org.apache.paimon.flink.action.MigrateTableActionFactory
org.apache.paimon.flink.action.RemoveOrphanFilesActionFactory
+org.apache.paimon.flink.action.QueryServiceActionFactory
### procedure factories
org.apache.paimon.flink.procedure.CompactDatabaseProcedure
@@ -38,3 +39,4 @@ org.apache.paimon.flink.procedure.RollbackToProcedure
org.apache.paimon.flink.procedure.MigrateTableProcedure
org.apache.paimon.flink.procedure.MigrateFileProcedure
org.apache.paimon.flink.procedure.RemoveOrphanFilesProcedure
+org.apache.paimon.flink.procedure.QueryServiceProcedure
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
index 4903ec9a0..a487ed4c1 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/RemoteLookupJoinITCase.java
@@ -21,6 +21,7 @@ package org.apache.paimon.flink;
import org.apache.paimon.data.GenericRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.disk.IOManager;
+import org.apache.paimon.flink.query.RemoteTableQuery;
import org.apache.paimon.service.ServiceManager;
import org.apache.paimon.service.network.stats.DisabledServiceRequestStats;
import org.apache.paimon.service.server.KvQueryServer;
@@ -34,6 +35,7 @@ import org.apache.paimon.table.sink.CommitMessageImpl;
import org.apache.paimon.utils.BlockingIterator;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
import org.junit.jupiter.api.Test;
import java.io.Closeable;
@@ -60,6 +62,38 @@ public class RemoteLookupJoinITCase extends
CatalogITCaseBase {
return 1;
}
+ @Test
+ public void testQueryServiceLookup() throws Exception {
+ sql(
+ "CREATE TABLE DIM (k INT PRIMARY KEY NOT ENFORCED, v INT) WITH
('bucket' = '2', 'continuous.discovery-interval' = '1ms')");
+ CloseableIterator<Row> service = streamSqlIter("CALL
sys.query_service('default.DIM', 2)");
+ RemoteTableQuery query = new RemoteTableQuery(getPaimonTable("DIM"));
+
+ sql("INSERT INTO DIM VALUES (1, 11), (2, 22), (3, 33), (4, 44)");
+ Thread.sleep(2000);
+
+ assertThat(query.lookup(row(), 0, row(1)))
+ .isNotNull()
+ .extracting(r -> r.getInt(1))
+ .isEqualTo(11);
+ assertThat(query.lookup(row(), 0, row(2)))
+ .isNotNull()
+ .extracting(r -> r.getInt(1))
+ .isEqualTo(22);
+ assertThat(query.lookup(row(), 1, row(3)))
+ .isNotNull()
+ .extracting(r -> r.getInt(1))
+ .isEqualTo(33);
+ assertThat(query.lookup(row(), 0, row(4)))
+ .isNotNull()
+ .extracting(r -> r.getInt(1))
+ .isEqualTo(44);
+ assertThat(query.lookup(row(), 0, row(5))).isNull();
+
+ service.close();
+ query.close();
+ }
+
@Test
public void testLookupRemoteTable() throws Throwable {
sql("CREATE TABLE DIM (i INT PRIMARY KEY NOT ENFORCED, j INT, k1 INT,
k2 INT)");
diff --git
a/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkUtils.java
b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkUtils.java
new file mode 100644
index 000000000..66b46ba74
--- /dev/null
+++
b/paimon-service/paimon-service-client/src/main/java/org/apache/paimon/service/network/NetworkUtils.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.service.network;
+
+import java.net.Inet4Address;
+import java.net.InetAddress;
+import java.net.NetworkInterface;
+import java.net.SocketException;
+import java.net.UnknownHostException;
+import java.util.Enumeration;
+import java.util.LinkedList;
+
+/** Utils for network. */
+public class NetworkUtils {
+
+ public static String findHostAddress() throws SocketException,
UnknownHostException {
+ return findLocalAddress().getHostAddress();
+ }
+
+ public static InetAddress findLocalAddress() throws UnknownHostException,
SocketException {
+ InetAddress localAddress = InetAddress.getLocalHost();
+ if (localAddress.isLoopbackAddress()) {
+ Enumeration<NetworkInterface> activeNetworkInterfaces =
+ NetworkInterface.getNetworkInterfaces();
+ LinkedList<NetworkInterface> reversedNetworkInterfaces = new
LinkedList<>();
+ // getNetworkInterfaces returns ifs in reverse order compared to
ifconfig output order
+ // on unix-like system.
+ while (activeNetworkInterfaces.hasMoreElements()) {
+
reversedNetworkInterfaces.addFirst(activeNetworkInterfaces.nextElement());
+ }
+
+ for (NetworkInterface networkInterface :
reversedNetworkInterfaces) {
+ Enumeration<InetAddress> addresses =
networkInterface.getInetAddresses();
+ while (addresses.hasMoreElements()) {
+ InetAddress address = addresses.nextElement();
+ if (address.isLinkLocalAddress() ||
address.isLoopbackAddress()) {
+ continue;
+ }
+
+ // because of Inet6Address.toHostName may add interface at
the end if it knows
+ // about it
+ if (!(address instanceof Inet4Address)) {
+ continue;
+ }
+
+ return InetAddress.getByAddress(address.getAddress());
+ }
+ }
+ }
+ return localAddress;
+ }
+}