This is an automated email from the ASF dual-hosted git repository.
yaozhq pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 6526b69b [ISSUE-592] feat: Add a Streaming Source for Hiveadd stream
hive reader (#594)
6526b69b is described below
commit 6526b69bbce536d4e918ec9fe6f6c32b9f05cb20
Author: DukeWangYu <[email protected]>
AuthorDate: Mon Aug 25 11:33:23 2025 +0800
[ISSUE-592] feat: Add a Streaming Source for Hiveadd stream hive reader
(#594)
* add stream hive reader
* add licenses to test case
* refine codes after review
---
.../geaflow/dsl/connector/api/FetchData.java | 11 ++++
.../geaflow/dsl/connector/hive/HiveReader.java | 36 ++++++++++-
.../dsl/connector/hive/HiveTableSource.java | 6 ++
.../dsl/connector/hive/HiveTableSourceTest.java | 69 +++++++++++++++-------
.../geaflow/dsl/runtime/query/HiveSourceTest.java | 36 +++++++++++
.../src/test/resources/query/hive_source_001.sql | 45 ++++++++++++++
6 files changed, 180 insertions(+), 23 deletions(-)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/FetchData.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/FetchData.java
index 89451288..cc042dcf 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/FetchData.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-api/src/main/java/org/apache/geaflow/dsl/connector/api/FetchData.java
@@ -73,6 +73,17 @@ public class FetchData<T> implements Serializable {
return nextOffset;
}
+ public void seek(long seekPos) {
+ long toSkip = seekPos;
+ while (toSkip > 0) {
+ if (!dataIterator.hasNext()) {
+ throw new RuntimeException("seek pos:" + seekPos + " exceed
the split size: " + (seekPos - toSkip));
+ }
+ dataIterator.next();
+ toSkip --;
+ }
+ }
+
/**
* Returns true if the fetch has finished for the partition.
*/
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveReader.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveReader.java
index 175929e1..62f648b6 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveReader.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveReader.java
@@ -20,12 +20,15 @@
package org.apache.geaflow.dsl.connector.hive;
import java.io.IOException;
+import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Properties;
import java.util.stream.Collectors;
+import org.apache.geaflow.common.exception.GeaflowRuntimeException;
import org.apache.geaflow.common.utils.ClassUtil;
import org.apache.geaflow.dsl.common.data.Row;
import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
@@ -50,7 +53,7 @@ public class HiveReader {
private final RecordReader<Writable, Writable> recordReader;
private final StructType readSchema;
private final Deserializer deserializer;
-
+ private long fetchOffset;
public HiveReader(RecordReader<Writable, Writable> recordReader,
StructType readSchema,
StorageDescriptor sd, Properties tableProps) {
@@ -59,6 +62,7 @@ public class HiveReader {
f -> new TableField(f.getName().toLowerCase(Locale.ROOT),
f.getType(), f.isNullable()))
.collect(Collectors.toList()));
this.deserializer =
ClassUtil.newInstance(sd.getSerdeInfo().getSerializationLib());
+ this.fetchOffset = 0L;
try {
org.apache.hadoop.conf.Configuration conf = new
org.apache.hadoop.conf.Configuration();
SerDeUtils.initializeSerDe(deserializer, conf, tableProps, null);
@@ -69,11 +73,37 @@ public class HiveReader {
}
public FetchData<Row> read(long windowSize, String[] partitionValues) {
+ Iterator<Row> hiveIterator = new HiveIterator(recordReader,
deserializer, partitionValues, readSchema);
if (windowSize == Long.MAX_VALUE) {
- Iterator<Row> hiveIterator = new HiveIterator(recordReader,
deserializer, partitionValues, readSchema);
return FetchData.createBatchFetch(hiveIterator, new
HiveOffset(-1L));
} else {
- throw new GeaFlowDSLException("Cannot support stream read for
hive");
+ long fetchCnt = 0L;
+ List<Row> rows = new ArrayList<>();
+ while (fetchCnt < windowSize) {
+ if (hiveIterator.hasNext()) {
+ fetchCnt ++;
+ rows.add(hiveIterator.next());
+ } else {
+ break;
+ }
+ }
+ fetchOffset += fetchCnt;
+ return FetchData.createStreamFetch(rows, new
HiveOffset(fetchOffset), fetchCnt < windowSize);
+ }
+ }
+
+ public void seek(long seekPos) {
+ try {
+ Writable key = recordReader.createKey();
+ Writable value = recordReader.createValue();
+ fetchOffset = seekPos;
+ while (seekPos-- > 0) {
+ if (!recordReader.next(key, value)) {
+ throw new GeaflowRuntimeException("fetch offset is out of
range: " + fetchOffset);
+ }
+ }
+ } catch (Exception e) {
+ throw new GeaflowRuntimeException(e);
}
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveTableSource.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveTableSource.java
index bdf8232f..72f4ad87 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveTableSource.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/main/java/org/apache/geaflow/dsl/connector/hive/HiveTableSource.java
@@ -213,6 +213,12 @@ public class HiveTableSource implements TableSource,
EnablePartitionPushDown {
}
reader = new HiveReader(recordReader, dataSchema,
hivePartition.getSd(), tableProps);
partitionReaders.put(partition.getName(), reader);
+ if (startOffset.isPresent()) {
+ long seekOffset = startOffset.get().getOffset();
+ if (seekOffset > 0) {
+ reader.seek(seekOffset);
+ }
+ }
}
try {
return (FetchData<T>) reader.read(desireWindowSize,
hivePartition.getPartitionValues());
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/test/java/org/apache/geaflow/dsl/connector/hive/HiveTableSourceTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/test/java/org/apache/geaflow/dsl/connector/hive/HiveTableSourceTest.java
index bd1140bf..b6a10e30 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/test/java/org/apache/geaflow/dsl/connector/hive/HiveTableSourceTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-hive/src/test/java/org/apache/geaflow/dsl/connector/hive/HiveTableSourceTest.java
@@ -36,7 +36,9 @@ import org.apache.geaflow.dsl.common.types.TableSchema;
import org.apache.geaflow.dsl.connector.api.FetchData;
import org.apache.geaflow.dsl.connector.api.Partition;
import org.apache.geaflow.dsl.connector.api.serde.TableDeserializer;
+import org.apache.geaflow.dsl.connector.api.window.AbstractFetchWindow;
import org.apache.geaflow.dsl.connector.api.window.AllFetchWindow;
+import org.apache.geaflow.dsl.connector.api.window.SizeFetchWindow;
import org.apache.geaflow.runtime.core.context.DefaultRuntimeContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -66,12 +68,13 @@ public class HiveTableSourceTest extends BaseHiveTest {
new TableField("name", BinaryStringType.INSTANCE, true),
new TableField("age", IntegerType.INSTANCE, false)
);
- checkReadHive(ddl, inserts, dataSchema, new StructType(),
- "[1, jim, 20]\n"
- + "[2, kate, 18]\n"
- + "[3, lily, 22]\n"
- + "[4, lucy, 25]\n"
- + "[5, jack, 26]");
+ String expected = "[1, jim, 20]\n"
+ + "[2, kate, 18]\n"
+ + "[3, lily, 22]\n"
+ + "[4, lucy, 25]\n"
+ + "[5, jack, 26]";
+ checkReadHive(ddl, inserts, dataSchema, new StructType(), false,
expected);
+ checkReadHive(ddl, inserts, dataSchema, new StructType(), true,
expected);
}
@Test
@@ -87,12 +90,13 @@ public class HiveTableSourceTest extends BaseHiveTest {
new TableField("name", BinaryStringType.INSTANCE, true),
new TableField("age", IntegerType.INSTANCE, false)
);
- checkReadHive(ddl, inserts, dataSchema, new StructType(),
- "[1, jim, 20]\n"
- + "[2, kate, 18]\n"
- + "[3, lily, 22]\n"
- + "[4, lucy, 25]\n"
- + "[5, jack, 26]");
+ String expected = "[1, jim, 20]\n"
+ + "[2, kate, 18]\n"
+ + "[3, lily, 22]\n"
+ + "[4, lucy, 25]\n"
+ + "[5, jack, 26]";
+ checkReadHive(ddl, inserts, dataSchema, new StructType(), true,
expected);
+ checkReadHive(ddl, inserts, dataSchema, new StructType(), false,
expected);
}
@Test
@@ -108,12 +112,13 @@ public class HiveTableSourceTest extends BaseHiveTest {
new TableField("name", BinaryStringType.INSTANCE, true),
new TableField("age", IntegerType.INSTANCE, false)
);
- checkReadHive(ddl, inserts, dataSchema, new StructType(),
- "[1, jim, 20]\n"
- + "[2, kate, 18]\n"
- + "[3, lily, 22]\n"
- + "[4, lucy, 25]\n"
- + "[5, jack, 26]");
+ String expected = "[1, jim, 20]\n"
+ + "[2, kate, 18]\n"
+ + "[3, lily, 22]\n"
+ + "[4, lucy, 25]\n"
+ + "[5, jack, 26]";
+ checkReadHive(ddl, inserts, dataSchema, new StructType(), false,
expected);
+ checkReadHive(ddl, inserts, dataSchema, new StructType(), true,
expected);
}
@Test
@@ -135,12 +140,23 @@ public class HiveTableSourceTest extends BaseHiveTest {
StructType partitionSchema = new StructType(
new TableField("dt", BinaryStringType.INSTANCE, false)
);
- checkReadHive(ddl, inserts, dataSchema, partitionSchema,
+ String expected = "[1, jim, 20, 2023-04-23]\n"
+ + "[2, kate, 18, 2023-04-24]\n"
+ + "[3, lily, 22, 2023-04-24]\n"
+ + "[4, lucy, 25, 2023-04-25]\n"
+ + "[5, jack, 26, 2023-04-26]";
+ checkReadHive(ddl, inserts, dataSchema, partitionSchema, false,
"[1, jim, 20, 2023-04-23]\n"
+ "[2, kate, 18, 2023-04-24]\n"
+ "[3, lily, 22, 2023-04-24]\n"
+ "[4, lucy, 25, 2023-04-25]\n"
+ "[5, jack, 26, 2023-04-26]");
+ checkReadHive(ddl, inserts, dataSchema, partitionSchema, true,
+ "[1, jim, 20, 2023-04-23]\n"
+ + "[2, kate, 18, 2023-04-24]\n"
+ + "[3, lily, 22, 2023-04-24]\n"
+ + "[4, lucy, 25, 2023-04-25]\n"
+ + "[5, jack, 26, 2023-04-26]");
}
@Test
@@ -163,16 +179,23 @@ public class HiveTableSourceTest extends BaseHiveTest {
new TableField("hh", BinaryStringType.INSTANCE, false),
new TableField("dt", BinaryStringType.INSTANCE, false)
);
- checkReadHive(ddl, inserts, dataSchema, partitionSchema,
+ checkReadHive(ddl, inserts, dataSchema, partitionSchema, false,
"[1, jim, 20, 10, 2023-04-23]\n"
+ "[2, kate, 18, 10, 2023-04-24]\n"
+ "[3, lily, 22, 11, 2023-04-24]\n"
+ "[4, lucy, 25, 12, 2023-04-25]\n"
+ "[5, jack, 26, 13, 2023-04-26]");
+ checkReadHive(ddl, inserts, dataSchema, partitionSchema, true,
+ "[1, jim, 20, 10, 2023-04-23]\n"
+ + "[2, kate, 18, 10, 2023-04-24]\n"
+ + "[3, lily, 22, 11, 2023-04-24]\n"
+ + "[4, lucy, 25, 12, 2023-04-25]\n"
+ + "[5, jack, 26, 13, 2023-04-26]");
}
private void checkReadHive(String ddl, String inserts, StructType
dataSchema,
StructType partitionSchema,
+ boolean isStream,
String expectResult) throws IOException {
executeHiveSql("Drop table if exists hive_user");
executeHiveSql(ddl);
@@ -201,6 +224,12 @@ public class HiveTableSourceTest extends BaseHiveTest {
List<Row> readRows = new ArrayList<>();
for (Partition partition : partitions) {
LOGGER.info("partition: {}", partition.getName());
+ AbstractFetchWindow window;
+ if (isStream) {
+ window = new SizeFetchWindow(1, Long.MAX_VALUE);
+ } else {
+ window = new AllFetchWindow(1);
+ }
FetchData<Row> fetchData = hiveTableSource.fetch(partition,
Optional.empty(),
new AllFetchWindow(1));
Iterator<Row> rowIterator = fetchData.getDataIterator();
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
new file mode 100644
index 00000000..af57d249
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/java/org/apache/geaflow/dsl/runtime/query/HiveSourceTest.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.runtime.query;
+
+import org.apache.geaflow.common.config.keys.FrameworkConfigKeys;
+import org.testng.annotations.Test;
+
+public class HiveSourceTest {
+
+ @Test(enabled = false)
+ public void testHiveSource_001() throws Exception {
+ QueryTester
+ .build()
+
.withConfig(FrameworkConfigKeys.BATCH_NUMBER_PER_CHECKPOINT.getKey(), 1)
+ .withQueryPath("/query/hive_source_001.sql")
+ .execute()
+ .checkSinkResult();
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/hive_source_001.sql
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/hive_source_001.sql
new file mode 100644
index 00000000..6f351179
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-runtime/src/test/resources/query/hive_source_001.sql
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+CREATE TABLE IF NOT EXISTS v_person (
+ src bigint,
+ dst bigint
+) WITH (
+ type='hive',
+ geaflow.dsl.hive.database.name='db1',
+ geaflow.dsl.hive.table.name='test',
+ geaflow.dsl.window.size='1000000',
+ geaflow.dsl.hive.metastore.uris='thrift://localhost:9083'
+);
+
+
+CREATE TABLE IF NOT EXISTS tbl_result (
+ src bigint,
+ dst bigint
+) WITH (
+ type='file',
+ geaflow.dsl.file.path='${target}'
+);
+
+INSERT INTO tbl_result
+SELECT
+ src,
+ dst
+FROM v_person
+;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]