stevenzwu commented on code in PR #8555:
URL: https://github.com/apache/iceberg/pull/8555#discussion_r1330740391
##########
core/src/main/java/org/apache/iceberg/io/OutputFileFactory.java:
##########
@@ -158,12 +161,31 @@ public Builder suffix(String newSuffix) {
return this;
}
+ /**
+ * Configures a {@link FileIO} supplier, which is used to dynamically
refresh the file IO
Review Comment:
nit: `which is used to` -> `which can potentially`
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/RowDataTaskWriterFactory.java:
##########
@@ -105,10 +104,17 @@ public RowDataTaskWriterFactory(
}
}
+ void setTable(Table table) {
Review Comment:
can we define a new constructor that takes in `Supplier<Table>`? the old
constructor can simply forward to this new constructor. then
`RefreshableTableSupplier` can be plugged in directly
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/TableLoader.java:
##########
@@ -38,6 +38,8 @@ public interface TableLoader extends Closeable, Serializable,
Cloneable {
void open();
+ boolean isOpen();
Review Comment:
I guess we can revisit this when we moved back to supplier model.
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/CachingTableLoader.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink;
+
+import java.io.IOException;
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.util.Preconditions;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.util.DateTimeUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * A table loader that will only reload a table after a certain interval has
passed. WARNING: This
+ * table loader should be used carefully when used with writer tasks. It could
result in heavy load
+ * on a catalog for jobs with many writers.
+ */
+@Experimental
+public class CachingTableLoader implements TableLoader {
Review Comment:
sorry. there are probably 2 reasons I changed my mind (1) I missed how
`Supplier<Table> tableSupplier` was used like in `ManifestOutputFileFactory`.
also (2) earlier, `TableSupplier` was exposed as public to users in
`FlinkSink`. that can be confusing to users with `TableLoader` vs
`TableSupplier`.
With it being non-public and hidden from users, I think `TableSupplier`
makes more sense to me. We can pass the same object to
`ManifestOutputFileFactory` and others directly. I just checked the commit
history. I actually like the initial commit
##########
flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/sink/FlinkSink.java:
##########
@@ -266,6 +269,20 @@ public Builder upsert(boolean enabled) {
return this;
}
+ /**
+ * Sets the interval for refreshing the table instance in {@link
IcebergStreamWriter}. If not
+ * specified then the default behavior is to not refresh the table, and
the initial table
+ * instance initialized is used for the lifetime of the job.
+ *
+ * @param refreshInterval the interval for refreshing the table in writer
subtasks
+ * @return {@link Builder} to connect the iceberg table.
+ */
+ @Experimental
+ public Builder tableRefreshInternal(Duration refreshInterval) {
Review Comment:
typo: Internal -> Interval
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]