czy006 commented on code in PR #4124:
URL: https://github.com/apache/amoro/pull/4124#discussion_r3055303846
##########
amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/lookup/BasicLookupFunction.java:
##########
@@ -244,20 +241,16 @@ private void checkErrorAndRethrow() {
}
private String generateRocksDBPath(FunctionContext context, String
tableName) {
- String tmpPath = getTmpDirectoryFromTMContainer(context);
+ String tmpPath = getTmpDirectory(context);
Review Comment:
The new getTmpDirectory() silently changes where RocksDB stores its files.
The original implementation read taskmanager.tmp.dirs via
TaskManagerRuntimeInfo —
the directories explicitly configured for I/O-heavy workloads, often
pointing to
large dedicated disks. The replacement falls back to
System.getProperty("java.io.tmpdir"),
which is typically /tmp on the host OS. In a production cluster this can
cause:
- Disk exhaustion on a small system partition
- RocksDB files landing on the wrong disk tier
The job-parameter approach context.getJobParameter("java.io.tmpdir", null)
also
requires every user to manually thread a new job parameter through their
submission
config — a silent breaking change for existing deployments.
The reflection was the only blocker here; the surrounding logic was correct.
A cleaner fix that avoids reflection and preserves semantics:
1. Extend BasicLookupFunction to RichTableFunction (or keep it as is but
accept
a tmp-dir supplier injected at construction time).
2. In open(), call
getRuntimeContext().getTaskManagerRuntimeInfo().getTmpDirectories()
— this is a fully public API since Flink 1.14, no reflection needed.
Suggested replacement:
```
private static String getTmpDirectory(FunctionContext context) {
// FunctionContext wraps a RuntimeContext; access it via the public API
// introduced in FLINK-17165 (Flink 1.14+).
try {
String[] dirs = context.getMetricGroup()
.getAllVariables()
.entrySet()
.stream()
// ... not viable this way
} catch (...) { }
return System.getProperty("java.io.tmpdir");
}
```
Actually the cleanest solution with zero reflection is to convert
BasicLookupFunction
to extend RichFunction directly and access getRuntimeContext() in open():
```
@Override
public void open(Configuration parameters) throws Exception {
String[] dirs = getRuntimeContext()
.getTaskManagerRuntimeInfo()
.getTmpDirectories();
this.rocksDBTmpDir =
dirs[ThreadLocalRandom.current().nextInt(dirs.length)];
...
}
```
This is a public, stable API — no setAccessible required — and fully restores
the original behavior.
##########
amoro-format-mixed/amoro-mixed-flink/amoro-mixed-flink-common/src/main/java/org/apache/amoro/flink/table/UnkeyedStreamingReaderOperator.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.amoro.flink.table;
+
+import org.apache.amoro.shade.guava32.com.google.common.base.Preconditions;
+import org.apache.amoro.shade.guava32.com.google.common.collect.Lists;
+import org.apache.flink.api.common.operators.MailboxExecutor;
+import org.apache.flink.api.common.state.ListState;
+import org.apache.flink.api.common.state.ListStateDescriptor;
+import org.apache.flink.runtime.state.JavaSerializer;
+import org.apache.flink.runtime.state.StateInitializationContext;
+import org.apache.flink.runtime.state.StateSnapshotContext;
+import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.api.operators.StreamSourceContexts;
+import org.apache.flink.streaming.api.watermark.Watermark;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.runtime.tasks.ProcessingTimeService;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.util.function.ThrowingRunnable;
+import org.apache.iceberg.flink.source.FlinkInputFormat;
+import org.apache.iceberg.flink.source.FlinkInputSplit;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.Queue;
+
+/** Minimal reader operator to avoid depending on Iceberg's non-public
StreamingReaderOperator. */
+public class UnkeyedStreamingReaderOperator extends
AbstractStreamOperator<RowData>
+ implements OneInputStreamOperator<FlinkInputSplit, RowData> {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(UnkeyedStreamingReaderOperator.class);
+
+ private final MailboxExecutor executor;
+ private FlinkInputFormat format;
+ private transient SourceFunction.SourceContext<RowData> sourceContext;
+ private transient ListState<FlinkInputSplit> inputSplitsState;
+ private transient Queue<FlinkInputSplit> splits;
+ private transient SplitState currentSplitState;
+
+ public UnkeyedStreamingReaderOperator(
+ FlinkInputFormat format, ProcessingTimeService timeService,
MailboxExecutor mailboxExecutor) {
+ this.format = Preconditions.checkNotNull(format, "The InputFormat should
not be null.");
+ this.processingTimeService = timeService;
+ this.executor =
+ Preconditions.checkNotNull(mailboxExecutor, "The mailboxExecutor
should not be null.");
+ }
+
+ @Override
+ public void initializeState(StateInitializationContext context) throws
Exception {
Review Comment:
FlinkInputSplit implements Serializable and already has a public
TypeInformation.
Use Flink's public TypeSerializer instead:
import org.apache.flink.api.java.typeutils.TypeExtractor;
...
```
TypeSerializer<FlinkInputSplit> serializer =
TypeExtractor.getForClass(FlinkInputSplit.class)
.createSerializer(getRuntimeContext().getExecutionConfig());
inputSplitsState = context.getOperatorStateStore()
.getListState(new ListStateDescriptor<>("splits", serializer));
```
Or, if a simple Kryo-based fallback is acceptable:
new ListStateDescriptor<>("splits", FlinkInputSplit.class)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]