This is an automated email from the ASF dual-hosted git repository.
loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git
The following commit(s) were added to refs/heads/master by this push:
new 3c7c814bb feat: Write data into ODPS with dynamic partiiton (#666)
3c7c814bb is described below
commit 3c7c814bb3c9ff953f4f41e67eb8ec92b5e8c246
Author: moses <[email protected]>
AuthorDate: Tue Dec 2 19:46:21 2025 +0800
feat: Write data into ODPS with dynamic partiiton (#666)
* feat: Write data into ODPS with dynamic partiiton
* fix partition extractor
* address
* minor fix
* get table schema
* minor fix
---
.../connector/odps/DefaultPartitionExtractor.java | 164 +++++++++++++++++++++
.../geaflow/dsl/connector/odps/OdpsConfigKeys.java | 6 +
.../geaflow/dsl/connector/odps/OdpsTableSink.java | 147 +++++++++---------
.../dsl/connector/odps/PartitionExtractor.java | 35 +++++
.../dsl/connector/odps/PartitionWriter.java | 66 +++++++++
.../odps/DefaultPartitionExtractorTest.java | 116 +++++++++++++++
.../dsl/connector/odps/OdpsTableSourceTest.java | 20 +--
7 files changed, 470 insertions(+), 84 deletions(-)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractor.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractor.java
new file mode 100644
index 000000000..d09a37e46
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractor.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.odps;
+
+import com.aliyun.odps.Column;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.types.StructType;
+
+public class DefaultPartitionExtractor implements PartitionExtractor {
+
+ private static final String DEFAULT_SEPARATOR_PATTERN = "[,/]";
+ private static final String QUOTE_SEPARATOR_PATTERN = "[`'\"]";
+ private static final String EQUAL_SEPARATOR = "=";
+ private static final String COMMA_SEPARATOR = ",";
+ private static final String SLASH_SEPARATOR = "/";
+ private static final String DYNAMIC_KEY_PREFIX = "$";
+
+ // partition spec separator
+ private final String separator;
+ // all partition keys
+ private final String[] keys;
+ // dynamic fields index
+ private final int[] columns;
+ // dynamic field types
+ private final IType<?>[] types;
+ // constant fields, values.length should be equal to keys.length
+ // if values[i] is null, it means the i-th key is a dynamic field
+ private final String[] values;
+
+ /**
+ * Create a partition extractor.
+ * @param partitionColumns partition columns
+ * @param schema the input schema
+ * @return the partition extractor
+ */
+ public static PartitionExtractor create(List<Column> partitionColumns,
StructType schema) {
+ if (partitionColumns == null || partitionColumns.isEmpty()) {
+ return row -> "";
+ }
+ int[] columns = new int[partitionColumns.size()];
+ IType<?>[] types = new IType<?>[partitionColumns.size()];
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < partitionColumns.size(); i++) {
+ String partitionColumn = partitionColumns.get(i).getName();
+ int index = schema.indexOf(partitionColumn);
+ if (index < 0) {
+ throw new IllegalArgumentException("Partition column " +
partitionColumn + " not found in schema");
+ }
+ columns[i] = index;
+ types[i] = schema.getType(index);
+
sb.append(partitionColumn).append(EQUAL_SEPARATOR).append(DYNAMIC_KEY_PREFIX)
+ .append(partitionColumn).append(COMMA_SEPARATOR);
+ }
+ return new DefaultPartitionExtractor(sb.substring(0, sb.length() - 1),
columns, types);
+ }
+
+ /**
+ * Create a partition extractor.
+ * @param spec partition spec, like "dt=$dt,hh=$hh"
+ * @param schema the input schema
+ * @return the partition extractor
+ */
+ public static PartitionExtractor create(String spec, StructType schema) {
+ if (spec == null || spec.isEmpty()) {
+ return row -> "";
+ }
+ String[] groups = spec.split(DEFAULT_SEPARATOR_PATTERN);
+ List<Integer> index = new ArrayList<>();
+ List<IType<?>> types = new ArrayList<>();
+ for (String group : groups) {
+ String[] kv = group.split(EQUAL_SEPARATOR);
+ if (kv.length != 2) {
+ throw new IllegalArgumentException("Invalid partition spec.");
+ }
+ String k = kv[0].trim();
+ String v = unquoted(kv[1].trim());
+ if (k.isEmpty() || v.isEmpty()) {
+ throw new IllegalArgumentException("Invalid partition spec.");
+ }
+ if (v.startsWith(DYNAMIC_KEY_PREFIX)) {
+ int val = schema.indexOf(v.substring(1));
+ if (val != -1) {
+ index.add(val);
+ types.add(schema.getType(val));
+ }
+ }
+ }
+ return new DefaultPartitionExtractor(spec, index.stream().mapToInt(i
-> i).toArray(), types.toArray(new IType[0]));
+ }
+
+ public DefaultPartitionExtractor(String spec, int[] columns, IType<?>[]
types) {
+ this.columns = columns;
+ this.types = types;
+ if (spec == null) {
+ throw new IllegalArgumentException("Argument 'spec' cannot be
null");
+ }
+ String[] groups = spec.split(DEFAULT_SEPARATOR_PATTERN);
+ this.separator = spec.contains(COMMA_SEPARATOR) ? COMMA_SEPARATOR :
SLASH_SEPARATOR;
+ this.keys = new String[groups.length];
+ this.values = new String[groups.length];
+ for (int i = 0; i < groups.length; i++) {
+ String[] kv = groups[i].split(EQUAL_SEPARATOR);
+ if (kv.length != 2) {
+ throw new IllegalArgumentException("Invalid partition spec.");
+ }
+ String k = kv[0].trim();
+ String v = unquoted(kv[1].trim());
+ if (k.isEmpty() || v.isEmpty()) {
+ throw new IllegalArgumentException("Invalid partition spec.");
+ }
+ this.keys[i] = k;
+ this.values[i] = v.startsWith(DYNAMIC_KEY_PREFIX) ? null : v;
+ }
+ }
+
+ /**
+ * Unquote the string.
+ * @param s the string
+ * @return the unquoted string
+ */
+ public static String unquoted(String s) {
+ return s.replaceAll(QUOTE_SEPARATOR_PATTERN, "");
+ }
+
+ @Override
+ public String extractPartition(Row row) {
+ StringBuilder sb = new StringBuilder();
+ // dynamic field
+ int col = 0;
+ for (int i = 0; i < keys.length; i++) {
+ sb.append(keys[i]).append(EQUAL_SEPARATOR);
+ if (values[i] == null) {
+ sb.append(row.getField(columns[col], types[col]));
+ col++;
+ } else {
+ sb.append(values[i]);
+ }
+ if (i < keys.length - 1) {
+ sb.append(separator);
+ }
+ }
+ return sb.toString();
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsConfigKeys.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsConfigKeys.java
index 3b8e9b5b1..753edc09e 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsConfigKeys.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsConfigKeys.java
@@ -48,6 +48,7 @@ public class OdpsConfigKeys {
.noDefaultValue()
.description("The odps endpoint.");
+ @Deprecated
public static final ConfigKey GEAFLOW_DSL_ODPS_PARTITION_SPEC = ConfigKeys
.key("geaflow.dsl.odps.partition.spec")
.defaultValue("")
@@ -58,6 +59,11 @@ public class OdpsConfigKeys {
.defaultValue(1000)
.description("The buffer size of odps sink buffer.");
+ public static final ConfigKey GEAFLOW_DSL_ODPS_SINK_FLUSH_INTERVAL_MS =
ConfigKeys
+ .key("geaflow.dsl.odps.sink.flush.interval.ms")
+ .defaultValue(10000)
+ .description("The flush interval of odps sink buffer.");
+
public static final ConfigKey GEAFLOW_DSL_ODPS_TIMEOUT_SECONDS = ConfigKeys
.key("geaflow.dsl.odps.timeout.seconds")
.defaultValue(60)
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSink.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSink.java
index 8d331dc0e..e6c22bc70 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSink.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSink.java
@@ -23,25 +23,24 @@ import com.aliyun.odps.Column;
import com.aliyun.odps.Odps;
import com.aliyun.odps.PartitionSpec;
import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
import com.aliyun.odps.account.Account;
import com.aliyun.odps.account.AliyunAccount;
import com.aliyun.odps.data.ArrayRecord;
-import com.aliyun.odps.data.Record;
-import com.aliyun.odps.data.RecordWriter;
import com.aliyun.odps.tunnel.TableTunnel;
-import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
import com.aliyun.odps.tunnel.TunnelException;
-import com.aliyun.odps.utils.StringUtils;
import com.google.common.base.Preconditions;
import com.google.common.base.Strings;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
import org.apache.geaflow.api.context.RuntimeContext;
import org.apache.geaflow.common.config.Configuration;
import org.apache.geaflow.dsl.common.data.Row;
@@ -56,24 +55,24 @@ public class OdpsTableSink implements TableSink {
private static final Logger LOGGER =
LoggerFactory.getLogger(OdpsTableSink.class);
private int bufferSize = 1000;
+ private int flushIntervalMs = Integer.MAX_VALUE;
+
private int timeoutSeconds = 60;
private String endPoint;
private String project;
private String tableName;
private String accessKey;
private String accessId;
- private String partitionSpec;
- private String shardNamePrefix;
private StructType schema;
+ private String partitionSpec;
- private transient Odps odps;
- private transient Table table;
private transient TableTunnel tunnel;
- private transient UploadSession uploadSession;
- private transient RecordWriter writer;
private transient Column[] recordColumns;
private transient int[] columnIndex;
- private transient List<Object[]> buffer;
+ private transient PartitionExtractor partitionExtractor;
+ private transient Map<String, PartitionWriter> partitionWriters;
+
+ private final ExecutorService executor =
Executors.newSingleThreadExecutor();
@Override
public void init(Configuration tableConf, StructType schema) {
@@ -90,53 +89,46 @@ public class OdpsTableSink implements TableSink {
this.accessKey =
tableConf.getString(OdpsConfigKeys.GEAFLOW_DSL_ODPS_ACCESS_KEY);
this.accessId =
tableConf.getString(OdpsConfigKeys.GEAFLOW_DSL_ODPS_ACCESS_ID);
this.partitionSpec =
tableConf.getString(OdpsConfigKeys.GEAFLOW_DSL_ODPS_PARTITION_SPEC);
- this.shardNamePrefix = project + "-" + tableName + "-";
int bufferSize =
tableConf.getInteger(OdpsConfigKeys.GEAFLOW_DSL_ODPS_SINK_BUFFER_SIZE);
if (bufferSize > 0) {
this.bufferSize = bufferSize;
}
+ int flushIntervalMs =
tableConf.getInteger(OdpsConfigKeys.GEAFLOW_DSL_ODPS_SINK_FLUSH_INTERVAL_MS);
+ if (flushIntervalMs > 0) {
+ this.flushIntervalMs = flushIntervalMs;
+ }
int timeoutSeconds =
tableConf.getInteger(OdpsConfigKeys.GEAFLOW_DSL_ODPS_TIMEOUT_SECONDS);
if (timeoutSeconds > 0) {
this.timeoutSeconds = timeoutSeconds;
}
checkArguments();
- LOGGER.info("init odps table sink, endPoint : {}, project : {},
tableName : {}, "
- + "partitionSpec: {}", endPoint, project, tableName,
partitionSpec);
+ LOGGER.info("init odps table sink, endPoint : {}, project : {},
tableName : {}",
+ endPoint, project, tableName);
}
@Override
public void open(RuntimeContext context) {
- this.buffer = new ArrayList<>(bufferSize);
Account account = new AliyunAccount(accessId, accessKey);
- this.odps = new Odps(account);
+ Odps odps = new Odps(account);
odps.setEndpoint(endPoint);
odps.setDefaultProject(project);
- this.table = odps.tables().get(project, tableName);
this.tunnel = new TableTunnel(odps);
- if (StringUtils.isEmpty(partitionSpec)) {
- throw new GeaFlowDSLException("For ODPS sink, partition spec
cannot be empty.");
- }
- PartitionSpec usePartitionSpec = new PartitionSpec(partitionSpec);
- ExecutorService executor = Executors.newSingleThreadExecutor();
- Future<UploadSession> future = executor.submit(() -> {
- try {
- return tunnel.createUploadSession(project, tableName,
usePartitionSpec);
- } catch (TunnelException e) {
- throw new GeaFlowDSLException("Cannot get odps session.", e);
- }
- });
- try {
- this.uploadSession = future.get(this.timeoutSeconds,
TimeUnit.SECONDS);
- } catch (Exception e) {
- throw new GeaFlowDSLException("Cannot list partitions from ODPS,
endPoint: " + this.endPoint, e);
- }
- this.recordColumns =
this.uploadSession.getSchema().getColumns().toArray(new Column[0]);
+ Table table = odps.tables().get(tableName);
+ TableSchema tableSchema = table.getSchema();
+ this.recordColumns = tableSchema.getColumns().toArray(new Column[0]);
this.columnIndex = new int[recordColumns.length];
for (int i = 0; i < this.recordColumns.length; i++) {
String columnName = this.recordColumns[i].getName();
columnIndex[i] = this.schema.indexOf(columnName);
}
+ if (this.partitionSpec != null && !this.partitionSpec.isEmpty()) {
+ this.partitionExtractor =
DefaultPartitionExtractor.create(this.partitionSpec, schema);
+ } else {
+ List<Column> partitionColumns = tableSchema.getPartitionColumns();
+ this.partitionExtractor =
DefaultPartitionExtractor.create(partitionColumns, schema);
+ }
+ this.partitionWriters = new HashMap<>();
}
@Override
@@ -149,58 +141,66 @@ public class OdpsTableSink implements TableSink {
values[i] = null;
}
}
- buffer.add(values);
- if (buffer.size() >= bufferSize) {
- try {
- flush();
- } catch (Exception e) {
- throw new IOException(e);
- }
- }
+ PartitionWriter writer =
createOrGetWriter(partitionExtractor.extractPartition(row));
+ writer.write(new ArrayRecord(recordColumns, values));
}
@Override
public void finish() throws IOException {
- try {
- flush();
- } catch (Exception e) {
- throw new IOException(e);
- }
+ flush();
}
@Override
public void close() {
LOGGER.info("close.");
- if (writer != null) {
- try {
- writer.close();
- } catch (IOException e) {
- throw new GeaFlowDSLException("Error when closing Odps
writer.", e);
+ flush();
+ }
+
+ private void flush() {
+ try {
+ for (PartitionWriter writer : partitionWriters.values()) {
+ writer.flush();
}
+ } catch (IOException e) {
+ throw new RuntimeException("Flush data error.", e);
}
}
- private void flush() throws IOException {
- if (buffer.isEmpty()) {
- return;
+ /**
+ * Create or get writer.
+ * @param partition the partition
+ * @return a writer
+ */
+ private PartitionWriter createOrGetWriter(String partition) {
+ PartitionWriter partitionWriter = partitionWriters.get(partition);
+ if (partitionWriter == null) {
+ TableTunnel.StreamUploadSession session =
createUploadSession(partition);
+ partitionWriter = new PartitionWriter(session, bufferSize,
flushIntervalMs);
+ partitionWriters.put(partition, partitionWriter);
}
- List<Object[]> flushBuffer = buffer;
- buffer = new ArrayList<>(bufferSize);
- List<Record> records = new ArrayList<>();
- for (Object[] value : flushBuffer) {
- Record record = new ArrayRecord(recordColumns, value);
- records.add(record);
- }
- if (writer == null) {
+ return partitionWriter;
+ }
+
+ /**
+ * Create an upload session.
+ * @param partition the partition
+ * @return an upload session
+ */
+ private TableTunnel.StreamUploadSession createUploadSession(@Nullable
String partition) {
+ Future<TableTunnel.StreamUploadSession> future = executor.submit(() ->
{
try {
- assert uploadSession != null : "The uploadSession has not been
open.";
- writer = uploadSession.openBufferedWriter();
+ if (partition == null || partition.isEmpty()) {
+ return tunnel.createStreamUploadSession(project,
tableName);
+ }
+ return tunnel.createStreamUploadSession(project, tableName,
new PartitionSpec(partition));
} catch (TunnelException e) {
- throw new GeaFlowDSLException("Cannot get Odps writer.", e);
+ throw new GeaFlowDSLException("Cannot get odps session.", e);
}
- }
- for (Record record : records) {
- writer.write(record);
+ });
+ try {
+ return future.get(this.timeoutSeconds, TimeUnit.SECONDS);
+ } catch (Exception e) {
+ throw new GeaFlowDSLException("Create stream upload session with
endpoint " + this.endPoint + " failed", e);
}
}
@@ -208,8 +208,7 @@ public class OdpsTableSink implements TableSink {
Preconditions.checkArgument(!Strings.isNullOrEmpty(endPoint),
"endPoint is null");
Preconditions.checkArgument(!Strings.isNullOrEmpty(project), "project
is null");
Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName),
"tableName is null");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(endPoint),
"accessKey is null");
- Preconditions.checkArgument(!Strings.isNullOrEmpty(project), "accessId
is null");
- Preconditions.checkNotNull(new PartitionSpec(partitionSpec));
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(accessId),
"accessId is null");
+ Preconditions.checkArgument(!Strings.isNullOrEmpty(accessKey),
"accessKey is null");
}
}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionExtractor.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionExtractor.java
new file mode 100644
index 000000000..9552e448b
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionExtractor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.odps;
+
+import org.apache.geaflow.dsl.common.data.Row;
+
+/**
+ * ODPS partition extractor.
+ */
+public interface PartitionExtractor {
+
+ /**
+ * extract partition from row.
+ * @param row the input row
+ * @return the partition
+ */
+ String extractPartition(Row row);
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionWriter.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionWriter.java
new file mode 100644
index 000000000..2d78cf742
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionWriter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.odps;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.tunnel.TableTunnel;
+import java.io.IOException;
+
+public class PartitionWriter {
+
+ private final TableTunnel.StreamRecordPack recordPack;
+ private final int batchSize;
+ private final int flushIntervalMs;
+ private long lastFlushTime;
+
+ public PartitionWriter(TableTunnel.StreamUploadSession uploadSession, int
batchSize, int flushIntervalMs) {
+ try {
+ this.recordPack = uploadSession.newRecordPack();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ this.batchSize = batchSize;
+ this.flushIntervalMs = flushIntervalMs;
+ this.lastFlushTime = System.currentTimeMillis();
+ }
+
+ /**
+ * Write a record to the stream, if the batch size is reached
+ * or the flush interval is reached, flush the stream.
+ * @param record The record to write.
+ * @throws IOException If an I/O error occurs.
+ */
+ public void write(Record record) throws IOException {
+ recordPack.append(record);
+ if (recordPack.getRecordCount() >= batchSize
+ || System.currentTimeMillis() - lastFlushTime >
flushIntervalMs) {
+ recordPack.flush();
+ lastFlushTime = System.currentTimeMillis();
+ }
+ }
+
+ /**
+ * Flush the stream.
+ * @throws IOException If an I/O error occurs.
+ */
+ public void flush() throws IOException {
+ recordPack.flush();
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractorTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractorTest.java
new file mode 100644
index 000000000..0532cc9c8
--- /dev/null
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.odps;
+
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.common.type.primitive.IntegerType;
+import org.apache.geaflow.common.type.primitive.StringType;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DefaultPartitionExtractorTest {
+
+ @Test
+ public void testExtract() {
+ Row row1 = ObjectRow.create(1, 2025111, 10, 3.14, "a11");
+ Row row2 = ObjectRow.create(1, 2025111, 11, 3.14, "b11");
+ Row row3 = ObjectRow.create(1, 2025111, 12, 3.14, "c11");
+ Row row4 = ObjectRow.create(1, 2025111, 13, 3.14, "d11");
+ Row row5 = ObjectRow.create(1, 2025111, 14, 3.14, "e11");
+ Row row6 = ObjectRow.create(1, 2025110, 11, 3.14, "null");
+
+ String spec1 = "dt=$dt,hh=$hh,biz=$biz";
+ DefaultPartitionExtractor extractor1 = new DefaultPartitionExtractor(
+ spec1, new int[]{1, 2, 4},
+ new IType[]{IntegerType.INSTANCE, IntegerType.INSTANCE,
StringType.INSTANCE});
+ Assert.assertEquals("dt=2025111,hh=10,biz=a11",
extractor1.extractPartition(row1));
+ Assert.assertEquals("dt=2025111,hh=11,biz=b11",
extractor1.extractPartition(row2));
+ Assert.assertEquals("dt=2025111,hh=12,biz=c11",
extractor1.extractPartition(row3));
+ Assert.assertEquals("dt=2025111,hh=13,biz=d11",
extractor1.extractPartition(row4));
+ Assert.assertEquals("dt=2025111,hh=14,biz=e11",
extractor1.extractPartition(row5));
+ Assert.assertEquals("dt=2025110,hh=11,biz=null",
extractor1.extractPartition(row6));
+
+ String spec2 = "dt=$dt/hh=$hh/biz=$biz";
+ DefaultPartitionExtractor extractor2 = new DefaultPartitionExtractor(
+ spec2, new int[]{1, 2, 4},
+ new IType[]{IntegerType.INSTANCE, IntegerType.INSTANCE,
StringType.INSTANCE});
+ Assert.assertEquals("dt=2025111/hh=10/biz=a11",
extractor2.extractPartition(row1));
+ Assert.assertEquals("dt=2025111/hh=11/biz=b11",
extractor2.extractPartition(row2));
+ Assert.assertEquals("dt=2025111/hh=12/biz=c11",
extractor2.extractPartition(row3));
+ Assert.assertEquals("dt=2025111/hh=13/biz=d11",
extractor2.extractPartition(row4));
+ Assert.assertEquals("dt=2025111/hh=14/biz=e11",
extractor2.extractPartition(row5));
+ Assert.assertEquals("dt=2025110/hh=11/biz=null",
extractor2.extractPartition(row6));
+
+
+ String spec3 = "dt=$dt";
+ DefaultPartitionExtractor extractor3 = new DefaultPartitionExtractor(
+ spec3, new int[]{1},
+ new IType[]{IntegerType.INSTANCE});
+ Assert.assertEquals("dt=2025111", extractor3.extractPartition(row1));
+ Assert.assertEquals("dt=2025111", extractor3.extractPartition(row2));
+ Assert.assertEquals("dt=2025111", extractor3.extractPartition(row3));
+ Assert.assertEquals("dt=2025111", extractor3.extractPartition(row4));
+ Assert.assertEquals("dt=2025111", extractor3.extractPartition(row5));
+ Assert.assertEquals("dt=2025110", extractor3.extractPartition(row6));
+
+
+ String spec4 = "dt=20251120";
+ DefaultPartitionExtractor extractor4 = new DefaultPartitionExtractor(
+ spec4, new int[]{},
+ new IType[]{});
+ Assert.assertEquals("dt=20251120", extractor4.extractPartition(row1));
+ Assert.assertEquals("dt=20251120", extractor4.extractPartition(row2));
+ Assert.assertEquals("dt=20251120", extractor4.extractPartition(row3));
+ Assert.assertEquals("dt=20251120", extractor4.extractPartition(row4));
+ Assert.assertEquals("dt=20251120", extractor4.extractPartition(row5));
+ Assert.assertEquals("dt=20251120", extractor4.extractPartition(row6));
+
+ String spec5 = "dt=20251120,hh=$hh";
+ DefaultPartitionExtractor extractor5 = new DefaultPartitionExtractor(
+ spec5, new int[]{2},
+ new IType[]{IntegerType.INSTANCE});
+ Assert.assertEquals("dt=20251120,hh=10",
extractor5.extractPartition(row1));
+ Assert.assertEquals("dt=20251120,hh=11",
extractor5.extractPartition(row2));
+ Assert.assertEquals("dt=20251120,hh=12",
extractor5.extractPartition(row3));
+ Assert.assertEquals("dt=20251120,hh=13",
extractor5.extractPartition(row4));
+ Assert.assertEquals("dt=20251120,hh=14",
extractor5.extractPartition(row5));
+ Assert.assertEquals("dt=20251120,hh=11",
extractor5.extractPartition(row6));
+
+ PartitionExtractor extractor6 = DefaultPartitionExtractor.create("",
null);
+ Assert.assertEquals("", extractor6.extractPartition(row1));
+ Assert.assertEquals("", extractor6.extractPartition(row2));
+ Assert.assertEquals("", extractor6.extractPartition(row3));
+ Assert.assertEquals("", extractor6.extractPartition(row4));
+ Assert.assertEquals("", extractor6.extractPartition(row5));
+ Assert.assertEquals("", extractor6.extractPartition(row6));
+
+ }
+
+ @Test
+ public void testUnquoted() {
+ Assert.assertEquals("dt", DefaultPartitionExtractor.unquoted("dt"));
+ Assert.assertEquals("dt", DefaultPartitionExtractor.unquoted("`dt`"));
+ Assert.assertEquals("dt", DefaultPartitionExtractor.unquoted("'dt'"));
+ Assert.assertEquals("dt",
DefaultPartitionExtractor.unquoted("\"dt\""));
+ }
+}
diff --git
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSourceTest.java
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSourceTest.java
index dfeb552d7..a31756ef7 100644
---
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSourceTest.java
+++
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSourceTest.java
@@ -133,15 +133,15 @@ public class OdpsTableSourceTest {
@Test
public void testOdpsConnectorUtils() {
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.SMALLINT,
LongType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.INT,
LongType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.FLOAT,
FloatType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.DOUBLE,
DoubleType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.BOOLEAN,
BooleanType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.STRING,
BinaryStringType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.BINARY,
ByteType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.DECIMAL,
DecimalType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.VOID,
VoidType.INSTANCE), true);
- Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.DATE,
TimestampType.INSTANCE), true);
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.SMALLINT,
LongType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.INT,
LongType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.FLOAT,
FloatType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.DOUBLE,
DoubleType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.BOOLEAN,
BooleanType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.STRING,
BinaryStringType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.BINARY,
ByteType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.DECIMAL,
DecimalType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.VOID,
VoidType.INSTANCE));
+ Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.DATE,
TimestampType.INSTANCE));
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]