czy006 commented on code in PR #4124:
URL: https://github.com/apache/amoro/pull/4124#discussion_r3055303846


##########
amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java:
##########
@@ -244,20 +241,16 @@ private void checkErrorAndRethrow() {
   }
 
   private String generateRocksDBPath(FunctionContext context, String 
tableName) {
-    String tmpPath = getTmpDirectoryFromTMContainer(context);
+    String tmpPath = getTmpDirectory(context);

Review Comment:
   The new getTmpDirectory() silently changes where RocksDB stores its files.
   
   The original implementation read taskmanager.tmp.dirs via 
TaskManagerRuntimeInfo —
   the directories explicitly configured for I/O-heavy workloads, often 
pointing to
   large dedicated disks. The replacement falls back to 
System.getProperty("java.io.tmpdir"),
   which is typically /tmp on the host OS. In a production cluster this can 
cause:
     - Disk exhaustion on a small system partition
     - RocksDB files landing on the wrong disk tier
   
   The job-parameter approach context.getJobParameter("java.io.tmpdir", null) 
also
   requires every user to manually thread a new job parameter through their 
submission
   config — a silent breaking change for existing deployments.
   
   The reflection was the only blocker here; the surrounding logic was correct.
   A cleaner fix that avoids reflection and preserves semantics:
   
     1. Extend BasicLookupFunction to RichTableFunction (or keep it as is but 
accept
        a tmp-dir supplier injected at construction time).
     2. In open(), call 
getRuntimeContext().getTaskManagerRuntimeInfo().getTmpDirectories()
        — this is a fully public API since Flink 1.14, no reflection needed.
   
   Suggested replacement:
   
     ```
   private static String getTmpDirectory(FunctionContext context) {
       // FunctionContext wraps a RuntimeContext; access it via the public API
       // introduced in FLINK-17165 (Flink 1.14+).
       try {
         String[] dirs = context.getMetricGroup()
             .getAllVariables()
             .entrySet()
             .stream()
             // ... not viable this way
       } catch (...) { }
       return System.getProperty("java.io.tmpdir");
     }
   ```
   
   Actually the cleanest solution with zero reflection is to convert 
BasicLookupFunction
   to extend RichFunction directly and access getRuntimeContext() in open():
   
   ```
     @Override
     public void open(Configuration parameters) throws Exception {
       String[] dirs = getRuntimeContext()
           .getTaskManagerRuntimeInfo()
           .getTmpDirectories();
       this.rocksDBTmpDir = 
dirs[ThreadLocalRandom.current().nextInt(dirs.length)];
       ...
     }
   ```
   
   This is a public, stable API — no setAccessible required — and fully restores
   the original behavior.



##########
amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.flink.table;
+
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.flink.source.FlinkInputFormat;
+import org.apache.iceberg.flink.source.FlinkInputSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+
+/** Minimal reader operator to avoid depending on Iceberg's non-public 
StreamingReaderOperator. */
+public class UnkeyedStreamingReaderOperator extends 
AbstractStreamOperator<RowData>
+    implements OneInputStreamOperator<FlinkInputSplit, RowData> {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(UnkeyedStreamingReaderOperator.class);
+
+  private final MailboxExecutor executor;
+  private FlinkInputFormat format;
+  private transient SourceFunction.SourceContext<RowData> sourceContext;
+  private transient ListState<FlinkInputSplit> inputSplitsState;
+  private transient Queue<FlinkInputSplit> splits;
+  private transient SplitState currentSplitState;
+
+  public UnkeyedStreamingReaderOperator(
+      FlinkInputFormat format, ProcessingTimeService timeService, 
MailboxExecutor mailboxExecutor) {
+    this.format = Preconditions.checkNotNull(format, "The InputFormat should 
not be null.");
+    this.processingTimeService = timeService;
+    this.executor =
+        Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor 
should not be null.");
+  }
+
+  @Override
+  public void initializeState(StateInitializationContext context) throws 
Exception {

Review Comment:
   
   FlinkInputSplit implements Serializable and already has a public 
TypeInformation.
   Use Flink's public TypeSerializer instead:
   
     import org.apache.flink.api.java.typeutils.TypeExtractor;
     ...
   ```
     TypeSerializer<FlinkInputSplit> serializer =
         TypeExtractor.getForClass(FlinkInputSplit.class)
                      
.createSerializer(getRuntimeContext().getExecutionConfig());
     inputSplitsState = context.getOperatorStateStore()
         .getListState(new ListStateDescriptor<>("splits", serializer));
   ```
   
   Or, if a simple Kryo-based fallback is acceptable:
   
     new ListStateDescriptor<>("splits", FlinkInputSplit.class)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to