This is an automated email from the ASF dual-hosted git repository.

loogn pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/geaflow.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c7c814bb feat: Write data into ODPS with dynamic partiiton (#666)
3c7c814bb is described below

commit 3c7c814bb3c9ff953f4f41e67eb8ec92b5e8c246
Author: moses <[email protected]>
AuthorDate: Tue Dec 2 19:46:21 2025 +0800

    feat: Write data into ODPS with dynamic partiiton (#666)
    
    * feat: Write data into ODPS with dynamic partiiton
    
    * fix partition extractor
    
    * address
    
    * minor fix
    
    * get table schema
    
    * minor fix
---
 .../connector/odps/DefaultPartitionExtractor.java  | 164 +++++++++++++++++++++
 .../geaflow/dsl/connector/odps/OdpsConfigKeys.java |   6 +
 .../geaflow/dsl/connector/odps/OdpsTableSink.java  | 147 +++++++++---------
 .../dsl/connector/odps/PartitionExtractor.java     |  35 +++++
 .../dsl/connector/odps/PartitionWriter.java        |  66 +++++++++
 .../odps/DefaultPartitionExtractorTest.java        | 116 +++++++++++++++
 .../dsl/connector/odps/OdpsTableSourceTest.java    |  20 +--
 7 files changed, 470 insertions(+), 84 deletions(-)

diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractor.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractor.java
new file mode 100644
index 000000000..d09a37e46
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractor.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.odps;
+
+import com.aliyun.odps.Column;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.types.StructType;
+
+public class DefaultPartitionExtractor implements PartitionExtractor {
+
+    private static final String DEFAULT_SEPARATOR_PATTERN = "[,/]";
+    private static final String QUOTE_SEPARATOR_PATTERN = "[`'\"]";
+    private static final String EQUAL_SEPARATOR = "=";
+    private static final String COMMA_SEPARATOR = ",";
+    private static final String SLASH_SEPARATOR = "/";
+    private static final String DYNAMIC_KEY_PREFIX = "$";
+
+    // partition spec separator
+    private final String separator;
+    // all partition keys
+    private final String[] keys;
+    // dynamic fields index
+    private final int[] columns;
+    // dynamic field types
+    private final IType<?>[] types;
+    // constant fields, values.length should be equal to keys.length
+    // if values[i] is null, it means the i-th key is a dynamic field
+    private final String[] values;
+
+    /**
+     * Create a partition extractor.
+     * @param partitionColumns partition columns
+     * @param schema the input schema
+     * @return the partition extractor
+     */
+    public static PartitionExtractor create(List<Column> partitionColumns, 
StructType schema) {
+        if (partitionColumns == null || partitionColumns.isEmpty()) {
+            return row -> "";
+        }
+        int[] columns = new int[partitionColumns.size()];
+        IType<?>[] types = new IType<?>[partitionColumns.size()];
+        StringBuilder sb = new StringBuilder();
+        for (int i = 0; i < partitionColumns.size(); i++) {
+            String partitionColumn = partitionColumns.get(i).getName();
+            int index = schema.indexOf(partitionColumn);
+            if (index < 0) {
+                throw new IllegalArgumentException("Partition column " + 
partitionColumn + " not found in schema");
+            }
+            columns[i] = index;
+            types[i] = schema.getType(index);
+            
sb.append(partitionColumn).append(EQUAL_SEPARATOR).append(DYNAMIC_KEY_PREFIX)
+                    .append(partitionColumn).append(COMMA_SEPARATOR);
+        }
+        return new DefaultPartitionExtractor(sb.substring(0, sb.length() - 1), 
columns, types);
+    }
+
+    /**
+     * Create a partition extractor.
+     * @param spec partition spec, like "dt=$dt,hh=$hh"
+     * @param schema the input schema
+     * @return the partition extractor
+     */
+    public static PartitionExtractor create(String spec, StructType schema) {
+        if (spec == null || spec.isEmpty()) {
+            return row -> "";
+        }
+        String[] groups = spec.split(DEFAULT_SEPARATOR_PATTERN);
+        List<Integer> index = new ArrayList<>();
+        List<IType<?>> types = new ArrayList<>();
+        for (String group : groups) {
+            String[] kv = group.split(EQUAL_SEPARATOR);
+            if (kv.length != 2) {
+                throw new IllegalArgumentException("Invalid partition spec.");
+            }
+            String k = kv[0].trim();
+            String v = unquoted(kv[1].trim());
+            if (k.isEmpty() || v.isEmpty()) {
+                throw new IllegalArgumentException("Invalid partition spec.");
+            }
+            if (v.startsWith(DYNAMIC_KEY_PREFIX)) {
+                int val = schema.indexOf(v.substring(1));
+                if (val != -1) {
+                    index.add(val);
+                    types.add(schema.getType(val));
+                }
+            }
+        }
+        return new DefaultPartitionExtractor(spec, index.stream().mapToInt(i 
-> i).toArray(), types.toArray(new IType[0]));
+    }
+
+    public DefaultPartitionExtractor(String spec, int[] columns, IType<?>[] 
types) {
+        this.columns = columns;
+        this.types = types;
+        if (spec == null) {
+            throw new IllegalArgumentException("Argument 'spec' cannot be 
null");
+        }
+        String[] groups = spec.split(DEFAULT_SEPARATOR_PATTERN);
+        this.separator = spec.contains(COMMA_SEPARATOR) ? COMMA_SEPARATOR : 
SLASH_SEPARATOR;
+        this.keys = new String[groups.length];
+        this.values = new String[groups.length];
+        for (int i = 0; i < groups.length; i++) {
+            String[] kv = groups[i].split(EQUAL_SEPARATOR);
+            if (kv.length != 2) {
+                throw new IllegalArgumentException("Invalid partition spec.");
+            }
+            String k = kv[0].trim();
+            String v = unquoted(kv[1].trim());
+            if (k.isEmpty() || v.isEmpty()) {
+                throw new IllegalArgumentException("Invalid partition spec.");
+            }
+            this.keys[i] = k;
+            this.values[i] = v.startsWith(DYNAMIC_KEY_PREFIX) ? null : v;
+        }
+    }
+
+    /**
+     * Unquote the string.
+     * @param s the string
+     * @return the unquoted string
+     */
+    public static String unquoted(String s) {
+        return s.replaceAll(QUOTE_SEPARATOR_PATTERN, "");
+    }
+
+    @Override
+    public String extractPartition(Row row) {
+        StringBuilder sb = new StringBuilder();
+        // dynamic field
+        int col = 0;
+        for (int i = 0; i < keys.length; i++) {
+            sb.append(keys[i]).append(EQUAL_SEPARATOR);
+            if (values[i] == null) {
+                sb.append(row.getField(columns[col], types[col]));
+                col++;
+            } else {
+                sb.append(values[i]);
+            }
+            if (i < keys.length - 1) {
+                sb.append(separator);
+            }
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsConfigKeys.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsConfigKeys.java
index 3b8e9b5b1..753edc09e 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsConfigKeys.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsConfigKeys.java
@@ -48,6 +48,7 @@ public class OdpsConfigKeys {
         .noDefaultValue()
         .description("The odps endpoint.");
 
+    @Deprecated
     public static final ConfigKey GEAFLOW_DSL_ODPS_PARTITION_SPEC = ConfigKeys
         .key("geaflow.dsl.odps.partition.spec")
         .defaultValue("")
@@ -58,6 +59,11 @@ public class OdpsConfigKeys {
         .defaultValue(1000)
         .description("The buffer size of odps sink buffer.");
 
+    public static final ConfigKey GEAFLOW_DSL_ODPS_SINK_FLUSH_INTERVAL_MS = 
ConfigKeys
+            .key("geaflow.dsl.odps.sink.flush.interval.ms")
+            .defaultValue(10000)
+            .description("The flush interval of odps sink buffer.");
+
     public static final ConfigKey GEAFLOW_DSL_ODPS_TIMEOUT_SECONDS = ConfigKeys
         .key("geaflow.dsl.odps.timeout.seconds")
         .defaultValue(60)
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSink.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSink.java
index 8d331dc0e..e6c22bc70 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSink.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSink.java
@@ -23,25 +23,24 @@ import com.aliyun.odps.Column;
 import com.aliyun.odps.Odps;
 import com.aliyun.odps.PartitionSpec;
 import com.aliyun.odps.Table;
+import com.aliyun.odps.TableSchema;
 import com.aliyun.odps.account.Account;
 import com.aliyun.odps.account.AliyunAccount;
 import com.aliyun.odps.data.ArrayRecord;
-import com.aliyun.odps.data.Record;
-import com.aliyun.odps.data.RecordWriter;
 import com.aliyun.odps.tunnel.TableTunnel;
-import com.aliyun.odps.tunnel.TableTunnel.UploadSession;
 import com.aliyun.odps.tunnel.TunnelException;
-import com.aliyun.odps.utils.StringUtils;
 import com.google.common.base.Preconditions;
 import com.google.common.base.Strings;
 import java.io.IOException;
-import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
+import javax.annotation.Nullable;
 import org.apache.geaflow.api.context.RuntimeContext;
 import org.apache.geaflow.common.config.Configuration;
 import org.apache.geaflow.dsl.common.data.Row;
@@ -56,24 +55,24 @@ public class OdpsTableSink implements TableSink {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(OdpsTableSink.class);
 
     private int bufferSize = 1000;
+    private int flushIntervalMs = Integer.MAX_VALUE;
+
     private int timeoutSeconds = 60;
     private String endPoint;
     private String project;
     private String tableName;
     private String accessKey;
     private String accessId;
-    private String partitionSpec;
-    private String shardNamePrefix;
     private StructType schema;
+    private String partitionSpec;
 
-    private transient Odps odps;
-    private transient Table table;
     private transient TableTunnel tunnel;
-    private transient UploadSession uploadSession;
-    private transient RecordWriter writer;
     private transient Column[] recordColumns;
     private transient int[] columnIndex;
-    private transient List<Object[]> buffer;
+    private transient PartitionExtractor partitionExtractor;
+    private transient Map<String, PartitionWriter> partitionWriters;
+
+    private final ExecutorService executor = 
Executors.newSingleThreadExecutor();
 
     @Override
     public void init(Configuration tableConf, StructType schema) {
@@ -90,53 +89,46 @@ public class OdpsTableSink implements TableSink {
         this.accessKey = 
tableConf.getString(OdpsConfigKeys.GEAFLOW_DSL_ODPS_ACCESS_KEY);
         this.accessId = 
tableConf.getString(OdpsConfigKeys.GEAFLOW_DSL_ODPS_ACCESS_ID);
         this.partitionSpec = 
tableConf.getString(OdpsConfigKeys.GEAFLOW_DSL_ODPS_PARTITION_SPEC);
-        this.shardNamePrefix = project + "-" + tableName + "-";
         int bufferSize = 
tableConf.getInteger(OdpsConfigKeys.GEAFLOW_DSL_ODPS_SINK_BUFFER_SIZE);
         if (bufferSize > 0) {
             this.bufferSize = bufferSize;
         }
+        int flushIntervalMs = 
tableConf.getInteger(OdpsConfigKeys.GEAFLOW_DSL_ODPS_SINK_FLUSH_INTERVAL_MS);
+        if (flushIntervalMs > 0) {
+            this.flushIntervalMs = flushIntervalMs;
+        }
         int timeoutSeconds = 
tableConf.getInteger(OdpsConfigKeys.GEAFLOW_DSL_ODPS_TIMEOUT_SECONDS);
         if (timeoutSeconds > 0) {
             this.timeoutSeconds = timeoutSeconds;
         }
         checkArguments();
 
-        LOGGER.info("init odps table sink, endPoint : {},  project : {}, 
tableName : {}, "
-            + "partitionSpec: {}", endPoint, project, tableName, 
partitionSpec);
+        LOGGER.info("init odps table sink, endPoint : {},  project : {}, 
tableName : {}",
+                endPoint, project, tableName);
     }
 
     @Override
     public void open(RuntimeContext context) {
-        this.buffer = new ArrayList<>(bufferSize);
         Account account = new AliyunAccount(accessId, accessKey);
-        this.odps = new Odps(account);
+        Odps odps = new Odps(account);
         odps.setEndpoint(endPoint);
         odps.setDefaultProject(project);
-        this.table = odps.tables().get(project, tableName);
         this.tunnel = new TableTunnel(odps);
-        if (StringUtils.isEmpty(partitionSpec)) {
-            throw new GeaFlowDSLException("For ODPS sink, partition spec 
cannot be empty.");
-        }
-        PartitionSpec usePartitionSpec = new PartitionSpec(partitionSpec);
-        ExecutorService executor = Executors.newSingleThreadExecutor();
-        Future<UploadSession> future = executor.submit(() -> {
-            try {
-                return tunnel.createUploadSession(project, tableName, 
usePartitionSpec);
-            } catch (TunnelException e) {
-                throw new GeaFlowDSLException("Cannot get odps session.", e);
-            }
-        });
-        try {
-            this.uploadSession = future.get(this.timeoutSeconds, 
TimeUnit.SECONDS);
-        } catch (Exception e) {
-            throw new GeaFlowDSLException("Cannot list partitions from ODPS, 
endPoint: " + this.endPoint, e);
-        }
-        this.recordColumns = 
this.uploadSession.getSchema().getColumns().toArray(new Column[0]);
+        Table table = odps.tables().get(tableName);
+        TableSchema tableSchema = table.getSchema();
+        this.recordColumns = tableSchema.getColumns().toArray(new Column[0]);
         this.columnIndex = new int[recordColumns.length];
         for (int i = 0; i < this.recordColumns.length; i++) {
             String columnName = this.recordColumns[i].getName();
             columnIndex[i] = this.schema.indexOf(columnName);
         }
+        if (this.partitionSpec != null && !this.partitionSpec.isEmpty()) {
+            this.partitionExtractor = 
DefaultPartitionExtractor.create(this.partitionSpec, schema);
+        } else {
+            List<Column> partitionColumns = tableSchema.getPartitionColumns();
+            this.partitionExtractor = 
DefaultPartitionExtractor.create(partitionColumns, schema);
+        }
+        this.partitionWriters = new HashMap<>();
     }
 
     @Override
@@ -149,58 +141,66 @@ public class OdpsTableSink implements TableSink {
                 values[i] = null;
             }
         }
-        buffer.add(values);
-        if (buffer.size() >= bufferSize) {
-            try {
-                flush();
-            } catch (Exception e) {
-                throw new IOException(e);
-            }
-        }
+        PartitionWriter writer = 
createOrGetWriter(partitionExtractor.extractPartition(row));
+        writer.write(new ArrayRecord(recordColumns, values));
     }
 
     @Override
     public void finish() throws IOException {
-        try {
-            flush();
-        } catch (Exception e) {
-            throw new IOException(e);
-        }
+        flush();
     }
 
     @Override
     public void close() {
         LOGGER.info("close.");
-        if (writer != null) {
-            try {
-                writer.close();
-            } catch (IOException e) {
-                throw new GeaFlowDSLException("Error when closing Odps 
writer.", e);
+        flush();
+    }
+
+    private void flush() {
+        try {
+            for (PartitionWriter writer : partitionWriters.values()) {
+                writer.flush();
             }
+        } catch (IOException e) {
+            throw new RuntimeException("Flush data error.", e);
         }
     }
 
-    private void flush() throws IOException {
-        if (buffer.isEmpty()) {
-            return;
+    /**
+     * Create or get writer.
+     * @param partition the partition
+     * @return a writer
+     */
+    private PartitionWriter createOrGetWriter(String partition) {
+        PartitionWriter partitionWriter = partitionWriters.get(partition);
+        if (partitionWriter == null) {
+            TableTunnel.StreamUploadSession session = 
createUploadSession(partition);
+            partitionWriter = new PartitionWriter(session, bufferSize, 
flushIntervalMs);
+            partitionWriters.put(partition, partitionWriter);
         }
-        List<Object[]> flushBuffer = buffer;
-        buffer = new ArrayList<>(bufferSize);
-        List<Record> records = new ArrayList<>();
-        for (Object[] value : flushBuffer) {
-            Record record = new ArrayRecord(recordColumns, value);
-            records.add(record);
-        }
-        if (writer == null) {
+        return partitionWriter;
+    }
+
+    /**
+     * Create an upload session.
+     * @param partition the partition
+     * @return an upload session
+     */
+    private TableTunnel.StreamUploadSession createUploadSession(@Nullable 
String partition) {
+        Future<TableTunnel.StreamUploadSession> future = executor.submit(() -> 
{
             try {
-                assert uploadSession != null : "The uploadSession has not been 
open.";
-                writer = uploadSession.openBufferedWriter();
+                if (partition == null || partition.isEmpty()) {
+                    return tunnel.createStreamUploadSession(project, 
tableName);
+                }
+                return tunnel.createStreamUploadSession(project, tableName, 
new PartitionSpec(partition));
             } catch (TunnelException e) {
-                throw new GeaFlowDSLException("Cannot get Odps writer.", e);
+                throw new GeaFlowDSLException("Cannot get odps session.", e);
             }
-        }
-        for (Record record : records) {
-            writer.write(record);
+        });
+        try {
+            return future.get(this.timeoutSeconds, TimeUnit.SECONDS);
+        } catch (Exception e) {
+            throw new GeaFlowDSLException("Create stream upload session with 
endpoint " + this.endPoint + " failed", e);
         }
     }
 
@@ -208,8 +208,7 @@ public class OdpsTableSink implements TableSink {
         Preconditions.checkArgument(!Strings.isNullOrEmpty(endPoint), 
"endPoint is null");
         Preconditions.checkArgument(!Strings.isNullOrEmpty(project), "project 
is null");
         Preconditions.checkArgument(!Strings.isNullOrEmpty(tableName), 
"tableName is null");
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(endPoint), 
"accessKey is null");
-        Preconditions.checkArgument(!Strings.isNullOrEmpty(project), "accessId 
is null");
-        Preconditions.checkNotNull(new PartitionSpec(partitionSpec));
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(accessId), 
"accessId is null");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(accessKey), 
"accessKey is null");
     }
 }
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionExtractor.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionExtractor.java
new file mode 100644
index 000000000..9552e448b
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionExtractor.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.odps;
+
+import org.apache.geaflow.dsl.common.data.Row;
+
+/**
+ * ODPS partition extractor.
+ */
+public interface PartitionExtractor {
+
+    /**
+     * extract partition from row.
+     * @param row the input row
+     * @return the partition
+     */
+    String extractPartition(Row row);
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionWriter.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionWriter.java
new file mode 100644
index 000000000..2d78cf742
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/main/java/org/apache/geaflow/dsl/connector/odps/PartitionWriter.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.odps;
+
+import com.aliyun.odps.data.Record;
+import com.aliyun.odps.tunnel.TableTunnel;
+import java.io.IOException;
+
+public class PartitionWriter {
+
+    private final TableTunnel.StreamRecordPack recordPack;
+    private final int batchSize;
+    private final int flushIntervalMs;
+    private long lastFlushTime;
+
+    public PartitionWriter(TableTunnel.StreamUploadSession uploadSession, int 
batchSize, int flushIntervalMs) {
+        try {
+            this.recordPack = uploadSession.newRecordPack();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        this.batchSize = batchSize;
+        this.flushIntervalMs = flushIntervalMs;
+        this.lastFlushTime = System.currentTimeMillis();
+    }
+
+    /**
+     * Write a record to the stream, if the batch size is reached
+     * or the flush interval is reached, flush the stream.
+     * @param record The record to write.
+     * @throws IOException If an I/O error occurs.
+     */
+    public void write(Record record) throws IOException {
+        recordPack.append(record);
+        if (recordPack.getRecordCount() >= batchSize
+                || System.currentTimeMillis() - lastFlushTime > 
flushIntervalMs) {
+            recordPack.flush();
+            lastFlushTime = System.currentTimeMillis();
+        }
+    }
+
+    /**
+     * Flush the stream.
+     * @throws IOException If an I/O error occurs.
+     */
+    public void flush() throws IOException {
+        recordPack.flush();
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractorTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractorTest.java
new file mode 100644
index 000000000..0532cc9c8
--- /dev/null
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/DefaultPartitionExtractorTest.java
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.geaflow.dsl.connector.odps;
+
+import org.apache.geaflow.common.type.IType;
+import org.apache.geaflow.common.type.primitive.IntegerType;
+import org.apache.geaflow.common.type.primitive.StringType;
+import org.apache.geaflow.dsl.common.data.Row;
+import org.apache.geaflow.dsl.common.data.impl.ObjectRow;
+import org.apache.geaflow.dsl.common.types.StructType;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class DefaultPartitionExtractorTest {
+
+    @Test
+    public void testExtract() {
+        Row row1 = ObjectRow.create(1, 2025111, 10, 3.14, "a11");
+        Row row2 = ObjectRow.create(1, 2025111, 11, 3.14, "b11");
+        Row row3 = ObjectRow.create(1, 2025111, 12, 3.14, "c11");
+        Row row4 = ObjectRow.create(1, 2025111, 13, 3.14, "d11");
+        Row row5 = ObjectRow.create(1, 2025111, 14, 3.14, "e11");
+        Row row6 = ObjectRow.create(1, 2025110, 11, 3.14, "null");
+
+        String spec1 = "dt=$dt,hh=$hh,biz=$biz";
+        DefaultPartitionExtractor extractor1 = new DefaultPartitionExtractor(
+                spec1, new int[]{1, 2, 4},
+                new IType[]{IntegerType.INSTANCE, IntegerType.INSTANCE, 
StringType.INSTANCE});
+        Assert.assertEquals("dt=2025111,hh=10,biz=a11", 
extractor1.extractPartition(row1));
+        Assert.assertEquals("dt=2025111,hh=11,biz=b11", 
extractor1.extractPartition(row2));
+        Assert.assertEquals("dt=2025111,hh=12,biz=c11", 
extractor1.extractPartition(row3));
+        Assert.assertEquals("dt=2025111,hh=13,biz=d11", 
extractor1.extractPartition(row4));
+        Assert.assertEquals("dt=2025111,hh=14,biz=e11", 
extractor1.extractPartition(row5));
+        Assert.assertEquals("dt=2025110,hh=11,biz=null", 
extractor1.extractPartition(row6));
+
+        String spec2 = "dt=$dt/hh=$hh/biz=$biz";
+        DefaultPartitionExtractor extractor2 = new DefaultPartitionExtractor(
+                spec2, new int[]{1, 2, 4},
+                new IType[]{IntegerType.INSTANCE, IntegerType.INSTANCE, 
StringType.INSTANCE});
+        Assert.assertEquals("dt=2025111/hh=10/biz=a11", 
extractor2.extractPartition(row1));
+        Assert.assertEquals("dt=2025111/hh=11/biz=b11", 
extractor2.extractPartition(row2));
+        Assert.assertEquals("dt=2025111/hh=12/biz=c11", 
extractor2.extractPartition(row3));
+        Assert.assertEquals("dt=2025111/hh=13/biz=d11", 
extractor2.extractPartition(row4));
+        Assert.assertEquals("dt=2025111/hh=14/biz=e11", 
extractor2.extractPartition(row5));
+        Assert.assertEquals("dt=2025110/hh=11/biz=null", 
extractor2.extractPartition(row6));
+
+
+        String spec3 = "dt=$dt";
+        DefaultPartitionExtractor extractor3 = new DefaultPartitionExtractor(
+                spec3, new int[]{1},
+                new IType[]{IntegerType.INSTANCE});
+        Assert.assertEquals("dt=2025111", extractor3.extractPartition(row1));
+        Assert.assertEquals("dt=2025111", extractor3.extractPartition(row2));
+        Assert.assertEquals("dt=2025111", extractor3.extractPartition(row3));
+        Assert.assertEquals("dt=2025111", extractor3.extractPartition(row4));
+        Assert.assertEquals("dt=2025111", extractor3.extractPartition(row5));
+        Assert.assertEquals("dt=2025110", extractor3.extractPartition(row6));
+
+
+        String spec4 = "dt=20251120";
+        DefaultPartitionExtractor extractor4 = new DefaultPartitionExtractor(
+                spec4, new int[]{},
+                new IType[]{});
+        Assert.assertEquals("dt=20251120", extractor4.extractPartition(row1));
+        Assert.assertEquals("dt=20251120", extractor4.extractPartition(row2));
+        Assert.assertEquals("dt=20251120", extractor4.extractPartition(row3));
+        Assert.assertEquals("dt=20251120", extractor4.extractPartition(row4));
+        Assert.assertEquals("dt=20251120", extractor4.extractPartition(row5));
+        Assert.assertEquals("dt=20251120", extractor4.extractPartition(row6));
+
+        String spec5 = "dt=20251120,hh=$hh";
+        DefaultPartitionExtractor extractor5 = new DefaultPartitionExtractor(
+                spec5, new int[]{2},
+                new IType[]{IntegerType.INSTANCE});
+        Assert.assertEquals("dt=20251120,hh=10", 
extractor5.extractPartition(row1));
+        Assert.assertEquals("dt=20251120,hh=11", 
extractor5.extractPartition(row2));
+        Assert.assertEquals("dt=20251120,hh=12", 
extractor5.extractPartition(row3));
+        Assert.assertEquals("dt=20251120,hh=13", 
extractor5.extractPartition(row4));
+        Assert.assertEquals("dt=20251120,hh=14", 
extractor5.extractPartition(row5));
+        Assert.assertEquals("dt=20251120,hh=11", 
extractor5.extractPartition(row6));
+
+        PartitionExtractor extractor6 = DefaultPartitionExtractor.create("", 
null);
+        Assert.assertEquals("", extractor6.extractPartition(row1));
+        Assert.assertEquals("", extractor6.extractPartition(row2));
+        Assert.assertEquals("", extractor6.extractPartition(row3));
+        Assert.assertEquals("", extractor6.extractPartition(row4));
+        Assert.assertEquals("", extractor6.extractPartition(row5));
+        Assert.assertEquals("", extractor6.extractPartition(row6));
+
+    }
+
+    @Test
+    public void testUnquoted() {
+        Assert.assertEquals("dt", DefaultPartitionExtractor.unquoted("dt"));
+        Assert.assertEquals("dt", DefaultPartitionExtractor.unquoted("`dt`"));
+        Assert.assertEquals("dt", DefaultPartitionExtractor.unquoted("'dt'"));
+        Assert.assertEquals("dt", 
DefaultPartitionExtractor.unquoted("\"dt\""));
+    }
+}
diff --git 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSourceTest.java
 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSourceTest.java
index dfeb552d7..a31756ef7 100644
--- 
a/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSourceTest.java
+++ 
b/geaflow/geaflow-dsl/geaflow-dsl-connector/geaflow-dsl-connector-odps/src/test/java/org/apache/geaflow/dsl/connector/odps/OdpsTableSourceTest.java
@@ -133,15 +133,15 @@ public class OdpsTableSourceTest {
 
     @Test
     public void testOdpsConnectorUtils() {
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.SMALLINT, 
LongType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.INT, 
LongType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.FLOAT, 
FloatType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.DOUBLE, 
DoubleType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.BOOLEAN, 
BooleanType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.STRING, 
BinaryStringType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.BINARY, 
ByteType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.DECIMAL, 
DecimalType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.VOID, 
VoidType.INSTANCE), true);
-        Assert.assertEquals(OdpsConnectorUtils.typeEquals(OdpsType.DATE, 
TimestampType.INSTANCE), true);
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.SMALLINT, 
LongType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.INT, 
LongType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.FLOAT, 
FloatType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.DOUBLE, 
DoubleType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.BOOLEAN, 
BooleanType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.STRING, 
BinaryStringType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.BINARY, 
ByteType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.DECIMAL, 
DecimalType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.VOID, 
VoidType.INSTANCE));
+        Assert.assertTrue(OdpsConnectorUtils.typeEquals(OdpsType.DATE, 
TimestampType.INSTANCE));
     }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to