This is an automated email from the ASF dual-hosted git repository.

diwu pushed a commit to branch streaming-job-dev
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/streaming-job-dev by this push:
     new ad763ce8e57 [Feature](WIP) Add fetch meta and fix rewrite tvf problem 
(#55986)
ad763ce8e57 is described below

commit ad763ce8e57a1f790d78117ca0756f79a4901418
Author: wudi <[email protected]>
AuthorDate: Fri Sep 12 22:15:05 2025 +0800

    [Feature](WIP) Add fetch meta and fix rewrite tvf problem (#55986)
    
    ### What problem does this PR solve?
    
     Add fetch meta and fix rewrite tvf problem
---
 .../insert/streaming/StreamingInsertJob.java       | 64 +++++++++++----
 .../insert/streaming/StreamingInsertTask.java      | 32 ++++----
 .../doris/job/offset/SourceOffsetProvider.java     | 25 ++++--
 .../job/offset/s3/S3SourceOffsetProvider.java      | 91 ++++++++++++++--------
 .../commands/insert/InsertIntoTableCommand.java    | 47 ++---------
 5 files changed, 146 insertions(+), 113 deletions(-)

diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
index 84e42c6e1f6..a4e05b650e3 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertJob.java
@@ -56,6 +56,7 @@ import com.google.common.base.Preconditions;
 import com.google.gson.annotations.SerializedName;
 import lombok.Getter;
 import lombok.Setter;
+import lombok.extern.log4j.Log4j2;
 import org.apache.commons.collections.CollectionUtils;
 
 import java.io.DataOutput;
@@ -65,6 +66,7 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
 
+@Log4j2
 public class StreamingInsertJob extends AbstractJob<StreamingJobSchedulerTask, 
Map<Object, Object>> implements
         TxnStateChangeCallback, GsonPostProcessable {
     private final long dbId;
@@ -83,14 +85,16 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     @SerializedName("jp")
     private StreamingJobProperties jobProperties;
     @Getter
-    @SerializedName("tt")
+    @SerializedName("tvf")
     private String tvfType;
+    private Map<String, String> originTvfProps;
     @Getter
     StreamingInsertTask runningStreamTask;
     SourceOffsetProvider offsetProvider;
     @Setter
     @Getter
     private long lastScheduleTaskTimestamp = -1L;
+    private InsertIntoTableCommand baseCommand;
 
     public StreamingInsertJob(String jobName,
             JobStatus jobStatus,
@@ -105,17 +109,28 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
                 jobConfig, createTimeMs, executeSql);
         this.dbId = ConnectContext.get().getCurrentDbId();
         this.jobProperties = jobProperties;
-        this.tvfType = parseTvfType();
-        this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
-        this.offsetProvider.init(getExecuteSql(), jobProperties);
+        init();
     }
 
-    private String parseTvfType() {
-        NereidsParser parser = new NereidsParser();
-        InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(getExecuteSql());
-        UnboundTVFRelation firstTVF = command.getFirstTVF();
-        Preconditions.checkNotNull(firstTVF, "Only support insert sql with 
tvf");
-        return firstTVF.getFunctionName();
+    private void init() {
+        try {
+            UnboundTVFRelation currentTvf = getCurrentTvf();
+            this.originTvfProps = currentTvf.getProperties().getMap();
+            this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(currentTvf.getFunctionName());
+        } catch (Exception ex) {
+            log.warn("init streaming insert job failed, sql: {}", 
getExecuteSql(), ex);
+            throw new RuntimeException("init streaming insert job failed, sql: 
" + getExecuteSql(), ex);
+        }
+    }
+
+    private UnboundTVFRelation getCurrentTvf() throws Exception {
+        if (baseCommand == null) {
+            this.baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(getExecuteSql());
+        }
+        List<UnboundTVFRelation> allTVFRelation = 
baseCommand.getAllTVFRelation();
+        Preconditions.checkArgument(allTVFRelation.size() == 1, "Only support 
one source in insert streaming job");
+        UnboundTVFRelation unboundTVFRelation = allTVFRelation.get(0);
+        return unboundTVFRelation;
     }
 
     @Override
@@ -155,7 +170,14 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
     }
 
     protected void fetchMeta() {
-        offsetProvider.fetchRemoteMeta();
+        try {
+            if (originTvfProps == null) {
+                this.originTvfProps = getCurrentTvf().getProperties().getMap();
+            }
+            offsetProvider.fetchRemoteMeta(originTvfProps);
+        } catch (Exception ex) {
+            log.warn("fetch remote meta failed, job id: {}", getJobId(), ex);
+        }
     }
 
     public boolean needScheduleTask() {
@@ -228,8 +250,19 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getFailedTaskCount().get())));
         trow.addToColumnValue(new 
TCell().setStringVal(String.valueOf(getCanceledTaskCount().get())));
         trow.addToColumnValue(new TCell().setStringVal(getComment()));
-        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
-        trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+
+        if (offsetProvider != null && offsetProvider.getSyncOffset() != null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getSyncOffset()));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+        if (offsetProvider != null && offsetProvider.getRemoteOffset() != 
null) {
+            trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getRemoteOffset()));
+        } else {
+            trow.addToColumnValue(new 
TCell().setStringVal(FeConstants.null_string));
+        }
+
+        trow.addToColumnValue(new 
TCell().setStringVal(offsetProvider.getRemoteOffset()));
         trow.addToColumnValue(new TCell().setStringVal(
                 jobStatistic == null ? FeConstants.null_string : 
jobStatistic.toJson()));
         trow.addToColumnValue(new TCell().setStringVal(failMsg == null ? 
FeConstants.null_string : failMsg.getMsg()));
@@ -324,9 +357,8 @@ public class StreamingInsertJob extends 
AbstractJob<StreamingJobSchedulerTask, M
 
     @Override
     public void gsonPostProcess() throws IOException {
-        if (offsetProvider == null && jobProperties != null && tvfType != 
null) {
-            this.offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
-            // this.offsetProvider.init(getExecuteSql(), jobProperties);
+        if (offsetProvider == null) {
+            offsetProvider = 
SourceOffsetProviderFactory.createSourceOffsetProvider(tvfType);
         }
     }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
index 1227624fc8f..73ac267e678 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/extensions/insert/streaming/StreamingInsertTask.java
@@ -57,15 +57,15 @@ public class StreamingInsertTask {
     private Long startTimeMs;
     private Long finishTimeMs;
     private String sql;
-    private SourceOffsetProvider offsetProvider;
     private StmtExecutor stmtExecutor;
-    private InsertIntoTableCommand command;
+    private InsertIntoTableCommand taskCommand;
     private String currentDb;
     private UserIdentity userIdentity;
     private ConnectContext ctx;
     private Offset runningOffset;
     private AtomicBoolean isCanceled = new AtomicBoolean(false);
     private StreamingJobProperties jobProperties;
+    SourceOffsetProvider offsetProvider;
 
     public StreamingInsertTask(long jobId,
                                long taskId,
@@ -73,13 +73,13 @@ public class StreamingInsertTask {
                                SourceOffsetProvider offsetProvider,
                                String currentDb,
                                StreamingJobProperties jobProperties,
-                                UserIdentity userIdentity) {
+                               UserIdentity userIdentity) {
         this.jobId = jobId;
         this.taskId = taskId;
         this.sql = sql;
-        this.offsetProvider = offsetProvider;
         this.userIdentity = userIdentity;
         this.currentDb = currentDb;
+        this.offsetProvider = offsetProvider;
         this.jobProperties = jobProperties;
         this.labelName = getJobId() + LABEL_SPLITTER + getTaskId();
         this.createTimeMs = System.currentTimeMillis();
@@ -106,7 +106,7 @@ public class StreamingInsertTask {
         }
     }
 
-    private void before() throws JobException {
+    private void before() throws Exception {
         this.status = TaskStatus.RUNNING;
         this.startTimeMs = System.currentTimeMillis();
 
@@ -117,12 +117,12 @@ public class StreamingInsertTask {
         ctx.setSessionVariable(jobProperties.getSessionVariable());
         StatementContext statementContext = new StatementContext();
         ctx.setStatementContext(statementContext);
-        offsetProvider.init(sql, jobProperties);
-        this.runningOffset = offsetProvider.getNextOffset();
-        this.command = offsetProvider.rewriteTvfParams(runningOffset);
-        this.command.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER + 
getTaskId()));
-        this.command.setJobId(getTaskId());
-        stmtExecutor = new StmtExecutor(ctx, new LogicalPlanAdapter(command, 
ctx.getStatementContext()));
+
+        this.runningOffset = offsetProvider.getNextOffset(jobProperties, 
jobProperties.getProperties());
+        this.taskCommand = offsetProvider.rewriteTvfParams(sql, runningOffset);
+        this.taskCommand.setLabelName(Optional.of(getJobId() + LABEL_SPLITTER 
+ getTaskId()));
+        this.taskCommand.setJobId(getTaskId());
+        this.stmtExecutor = new StmtExecutor(ctx, new 
LogicalPlanAdapter(taskCommand, ctx.getStatementContext()));
     }
 
     private void run() throws JobException {
@@ -134,7 +134,7 @@ public class StreamingInsertTask {
                     log.info("task has been canceled, task id is {}", 
getTaskId());
                     return;
                 }
-                command.run(ctx, stmtExecutor);
+                taskCommand.run(ctx, stmtExecutor);
                 if (ctx.getState().getStateType() == 
QueryState.MysqlStateType.OK) {
                     return;
                 } else {
@@ -142,13 +142,13 @@ public class StreamingInsertTask {
                 }
                 log.error(
                         "streaming insert failed with {}, reason {}, to retry",
-                        command.getLabelName(),
+                        taskCommand.getLabelName(),
                         errMsg);
                 if (retry == MAX_RETRY) {
                     errMsg = "reached max retry times, failed with" + errMsg;
                 }
             } catch (Exception e) {
-                log.warn("execute insert task error, label is {},offset is 
{}", command.getLabelName(),
+                log.warn("execute insert task error, label is {},offset is 
{}", taskCommand.getLabelName(),
                          runningOffset.toJson(), e);
                 errMsg = Util.getRootCauseMessage(e);
             }
@@ -209,8 +209,8 @@ public class StreamingInsertTask {
         if (null != stmtExecutor) {
             stmtExecutor = null;
         }
-        if (null != command) {
-            command = null;
+        if (null != taskCommand) {
+            taskCommand = null;
         }
         if (null != ctx) {
             ctx = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
index 49a272c664f..d9b2264d5b6 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/SourceOffsetProvider.java
@@ -20,16 +20,13 @@ package org.apache.doris.job.offset;
 import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
 
+import java.util.Map;
+
 /**
  * Interface for managing offsets and metadata of a data source.
  */
 public interface SourceOffsetProvider {
 
-    /**
-     * init
-     */
-    void init(String executeSql, StreamingJobProperties jobProperties);
-
     /**
      * Get source type, e.g. s3, kafka
      * @return
@@ -40,7 +37,7 @@ public interface SourceOffsetProvider {
      * Get next offset to consume
      * @return
      */
-    Offset getNextOffset();
+    Offset getNextOffset(StreamingJobProperties jobProps, Map<String, String> 
properties);
 
     /**
      * Get current offset
@@ -48,12 +45,24 @@ public interface SourceOffsetProvider {
      */
     Offset getCurrentOffset();
 
+    /**
+     * Get sync offset to show
+     * @return
+     */
+    String getSyncOffset();
+
+    /**
+     * Get remote offset
+     * @return
+     */
+    String getRemoteOffset();
+
     /**
      * Rewrite the TVF parameters in the SQL based on the current offset.
      * @param nextOffset
      * @return rewritten InsertIntoTableCommand
      */
-    InsertIntoTableCommand rewriteTvfParams(Offset nextOffset);
+    InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset 
nextOffset) throws Exception;
 
     /**
      * Update the offset of the source.
@@ -64,7 +73,7 @@ public interface SourceOffsetProvider {
     /**
      * Fetch remote meta information, such as listing files in S3 or getting 
latest offsets in Kafka.
      */
-    void fetchRemoteMeta();
+    void fetchRemoteMeta(Map<String, String> properties) throws Exception;
 
     /**
      * Whether there is more data to consume
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
index b858a1e6431..a95f4af65c9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/job/offset/s3/S3SourceOffsetProvider.java
@@ -17,51 +17,35 @@
 
 package org.apache.doris.job.offset.s3;
 
-import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.property.storage.StorageProperties;
 import org.apache.doris.fs.FileSystemFactory;
 import org.apache.doris.fs.remote.RemoteFileSystem;
 import org.apache.doris.job.extensions.insert.streaming.StreamingJobProperties;
 import org.apache.doris.job.offset.Offset;
 import org.apache.doris.job.offset.SourceOffsetProvider;
-import org.apache.doris.nereids.analyzer.UnboundTVFRelation;
 import org.apache.doris.nereids.parser.NereidsParser;
+import org.apache.doris.nereids.trees.expressions.Properties;
+import org.apache.doris.nereids.trees.expressions.functions.table.S3;
+import org.apache.doris.nereids.trees.plans.Plan;
 import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalTVFRelation;
+import org.apache.doris.qe.ConnectContext;
 
+import com.google.common.collect.ImmutableList;
 import lombok.extern.log4j.Log4j2;
 
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 
 @Log4j2
 public class S3SourceOffsetProvider implements SourceOffsetProvider {
-    String executeSql;
     S3Offset currentOffset;
     String maxRemoteEndFile;
-    StreamingJobProperties jobProperties;
-    NereidsParser parser;
-    String filePath;
-    StorageProperties storageProperties;
-
-    @Override
-    public void init(String executeSql, StreamingJobProperties jobProperties) {
-        //todo: check is already init
-        this.executeSql = executeSql;
-        this.jobProperties = jobProperties;
-        this.parser = new NereidsParser();
-        InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(executeSql);
-        UnboundTVFRelation firstTVF = command.getFirstTVF();
-        Map<String, String> properties = firstTVF.getProperties().getMap();
-        try {
-            this.storageProperties = 
StorageProperties.createPrimary(properties);
-            String uri = storageProperties.validateAndGetUri(properties);
-            this.filePath = storageProperties.validateAndNormalizeUri(uri);
-        } catch (UserException e) {
-            throw new RuntimeException("Failed check storage props, " + 
e.getMessage(), e);
-        }
-    }
+    InsertIntoTableCommand baseCommand;
 
     @Override
     public String getSourceType() {
@@ -69,13 +53,17 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public S3Offset getNextOffset() {
+    public S3Offset getNextOffset(StreamingJobProperties jobProps, Map<String, 
String> properties) {
         S3Offset offset = new S3Offset();
         List<String> rfiles = new ArrayList<>();
         String startFile = currentOffset == null ? null : 
currentOffset.endFile;
+        String filePath = null;
+        StorageProperties storageProperties = 
StorageProperties.createPrimary(properties);
         try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
+            String uri = storageProperties.validateAndGetUri(properties);
+            filePath = storageProperties.validateAndNormalizeUri(uri);
             maxRemoteEndFile = fileSystem.globListWithLimit(filePath, rfiles, 
startFile,
-                    jobProperties.getS3BatchFiles(), 
jobProperties.getS3BatchSize());
+                    jobProps.getS3BatchFiles(), jobProps.getS3BatchSize());
             offset.setStartFile(startFile);
             offset.setEndFile(rfiles.get(rfiles.size() - 1));
             offset.setFileLists(rfiles);
@@ -92,15 +80,41 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public InsertIntoTableCommand rewriteTvfParams(Offset runningOffset) {
+    public String getSyncOffset() {
+        if (currentOffset != null) {
+            return currentOffset.getEndFile();
+        }
+        return null;
+    }
+
+    @Override
+    public String getRemoteOffset() {
+        return maxRemoteEndFile;
+    }
+
+    @Override
+    public InsertIntoTableCommand rewriteTvfParams(String executeSql, Offset 
runningOffset) throws Exception {
         S3Offset offset = (S3Offset) runningOffset;
         Map<String, String> props = new HashMap<>();
         String finalUri = "{" + String.join(",", offset.getFileLists()) + "}";
         props.put("uri", finalUri);
-        InsertIntoTableCommand command = (InsertIntoTableCommand) 
parser.parseSingle(executeSql);
-        //todo: command query plan is immutable
-        //command.rewriteFirstTvfProperties(getSourceType(), props);
-        return command;
+        if (baseCommand == null) {
+            this.baseCommand = (InsertIntoTableCommand) new 
NereidsParser().parseSingle(executeSql);
+            this.baseCommand.initPlan(ConnectContext.get(), 
ConnectContext.get().getExecutor(), false);
+        }
+
+        // rewrite plan
+        Plan rewritePlan = baseCommand.getLogicalQuery().rewriteUp(plan -> {
+            if (plan instanceof LogicalTVFRelation) {
+                LogicalTVFRelation originTvfRel = (LogicalTVFRelation) plan;
+                LogicalTVFRelation newRvfRel = new LogicalTVFRelation(
+                        originTvfRel.getRelationId(), new S3(new 
Properties(props)), ImmutableList.of());
+                return newRvfRel;
+            }
+            return plan;
+        });
+        return new InsertIntoTableCommand((LogicalPlan) rewritePlan, 
Optional.empty(), Optional.empty(),
+                Optional.empty(), true, Optional.empty());
     }
 
     @Override
@@ -109,8 +123,17 @@ public class S3SourceOffsetProvider implements 
SourceOffsetProvider {
     }
 
     @Override
-    public void fetchRemoteMeta() {
-        // list object
+    public void fetchRemoteMeta(Map<String, String> properties) throws 
Exception {
+        StorageProperties storageProperties = 
StorageProperties.createPrimary(properties);
+        String startFile = currentOffset == null ? null : 
currentOffset.endFile;
+        try (RemoteFileSystem fileSystem = 
FileSystemFactory.get(storageProperties)) {
+            String uri = storageProperties.validateAndGetUri(properties);
+            String filePath = storageProperties.validateAndNormalizeUri(uri);
+            maxRemoteEndFile = fileSystem.globListWithLimit(filePath, new 
ArrayList<>(), startFile,
+                    1, 1);
+        } catch (Exception e) {
+            throw e;
+        }
     }
 
     @Override
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
index 04a1db682c5..6736b16f8af 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/insert/InsertIntoTableCommand.java
@@ -79,11 +79,10 @@ import org.apache.commons.collections.CollectionUtils;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
+import java.util.ArrayList;
 import java.util.List;
-import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
-import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Consumer;
 import java.util.function.Supplier;
@@ -540,51 +539,21 @@ public class InsertIntoTableCommand extends Command 
implements NeedAuditEncrypti
     }
 
     // todo: add ut
-    public UnboundTVFRelation getFirstTVF() {
-        return getFirstTvfInPlan(getLogicalQuery());
+    public List<UnboundTVFRelation> getAllTVFRelation() {
+        List<UnboundTVFRelation> tvfs = new ArrayList<>();
+        findAllTVFInPlan(getLogicalQuery(), tvfs);
+        return tvfs;
     }
 
-    private UnboundTVFRelation getFirstTvfInPlan(LogicalPlan plan) {
+    private void findAllTVFInPlan(LogicalPlan plan, List<UnboundTVFRelation> 
tvfs) {
         if (plan instanceof UnboundTVFRelation) {
             UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
-            return tvfRelation;
+            tvfs.add(tvfRelation);
         }
 
         for (Plan child : plan.children()) {
             if (child instanceof LogicalPlan) {
-                UnboundTVFRelation result = getFirstTvfInPlan((LogicalPlan) 
child);
-                if (result != null) {
-                    return result;
-                }
-            }
-        }
-        return null;
-    }
-
-    // todo: add ut
-    public void rewriteFirstTvfProperties(String functionName, Map<String, 
String> props) {
-        AtomicBoolean found = new AtomicBoolean(false);
-        rewriteFirstTvfInPlan(originLogicalQuery, functionName, props, found);
-    }
-
-    private void rewriteFirstTvfInPlan(LogicalPlan plan,
-            String functionName, Map<String, String> props, AtomicBoolean 
found) {
-        if (found.get()) {
-            return;
-        }
-
-        if (plan instanceof UnboundTVFRelation) {
-            UnboundTVFRelation tvfRelation = (UnboundTVFRelation) plan;
-            if (functionName.equalsIgnoreCase(tvfRelation.getFunctionName())) {
-                tvfRelation.getProperties().getMap().putAll(props);
-                found.set(true);
-                return;
-            }
-        }
-
-        for (Plan child : plan.children()) {
-            if (child instanceof LogicalPlan) {
-                rewriteFirstTvfInPlan((LogicalPlan) child, functionName, 
props, found);
+                findAllTVFInPlan((LogicalPlan) child, tvfs);
             }
         }
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to