scwhittle commented on code in PR #33504:
URL: https://github.com/apache/beam/pull/33504#discussion_r2005129135


##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java:
##########
@@ -17,44 +17,62 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Utility to fetch and cache Iceberg {@link Table}s. */
 class TableCache {
-  private static final Cache<TableIdentifier, Table> CACHE =
-      CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build();
+  private static final Map<String, Catalog> CATALOG_CACHE = new 
ConcurrentHashMap<>();
+  private static final LoadingCache<String, Table> INTERNAL_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(1, TimeUnit.HOURS)
+          .refreshAfterWrite(3, TimeUnit.MINUTES)
+          .build(
+              new CacheLoader<String, Table>() {
+                @Override
+                public Table load(String identifier) {
+                  return checkStateNotNull(CATALOG_CACHE.get(identifier))
+                      .loadTable(TableIdentifier.parse(identifier));
+                }
+
+                @Override
+                public ListenableFuture<Table> reload(String unusedIdentifier, 
Table table) {
+                  table.refresh();
+                  return Futures.immediateFuture(table);
+                }
+              });;
 
-  static Table get(TableIdentifier identifier, Catalog catalog) {
+  static Table get(String identifier) {
+    checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first.");

Review Comment:
   this doesn't seem to check anything since it's always non-null.
   
   should you instead just improve the caught error to detect if setup wasn't 
called for the identifier and improve logging there?
   



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java:
##########
@@ -17,44 +17,62 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Utility to fetch and cache Iceberg {@link Table}s. */
 class TableCache {
-  private static final Cache<TableIdentifier, Table> CACHE =
-      CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build();
+  private static final Map<String, Catalog> CATALOG_CACHE = new 
ConcurrentHashMap<>();
+  private static final LoadingCache<String, Table> INTERNAL_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(1, TimeUnit.HOURS)
+          .refreshAfterWrite(3, TimeUnit.MINUTES)
+          .build(
+              new CacheLoader<String, Table>() {
+                @Override
+                public Table load(String identifier) {
+                  return checkStateNotNull(CATALOG_CACHE.get(identifier))
+                      .loadTable(TableIdentifier.parse(identifier));
+                }
+
+                @Override
+                public ListenableFuture<Table> reload(String unusedIdentifier, 
Table table) {
+                  table.refresh();
+                  return Futures.immediateFuture(table);
+                }
+              });;
 
-  static Table get(TableIdentifier identifier, Catalog catalog) {
+  static Table get(String identifier) {
+    checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first.");
     try {
-      return CACHE.get(identifier, () -> catalog.loadTable(identifier));
+      return INTERNAL_CACHE.get(identifier);
     } catch (ExecutionException e) {
       throw new RuntimeException(
           "Encountered a problem fetching table " + identifier + " from 
cache.", e);
     }
   }
 
-  static Table get(String identifier, Catalog catalog) {
-    return get(TableIdentifier.parse(identifier), catalog);
-  }
-
-  static Table getRefreshed(TableIdentifier identifier, Catalog catalog) {
-    @Nullable Table table = CACHE.getIfPresent(identifier);
-    if (table == null) {
-      return get(identifier, catalog);
-    }
-    table.refresh();
-    CACHE.put(identifier, table);
-    return table;
+  /** Forces a table refresh and returns. */
+  static Table getRefreshed(String identifier) {
+    checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first.");
+    INTERNAL_CACHE.refresh(identifier);
+    return get(identifier);
   }
 
-  static Table getRefreshed(String identifier, Catalog catalog) {
-    return getRefreshed(TableIdentifier.parse(identifier), catalog);
+  static void setup(IcebergScanConfig scanConfig) {
+    CATALOG_CACHE.putIfAbsent(
+        scanConfig.getTableIdentifier(), 
scanConfig.getCatalogConfig().catalog());

Review Comment:
   if it was already present, should we check the catalog is consistent?



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/TableCache.java:
##########
@@ -17,44 +17,62 @@
  */
 package org.apache.beam.sdk.io.iceberg;
 
+import static org.apache.beam.sdk.util.Preconditions.checkStateNotNull;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.TimeUnit;
-import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.Cache;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheBuilder;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.CacheLoader;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.cache.LoadingCache;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.Futures;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.util.concurrent.ListenableFuture;
 import org.apache.iceberg.Table;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.TableIdentifier;
-import org.checkerframework.checker.nullness.qual.Nullable;
 
 /** Utility to fetch and cache Iceberg {@link Table}s. */
 class TableCache {
-  private static final Cache<TableIdentifier, Table> CACHE =
-      CacheBuilder.newBuilder().expireAfterWrite(3, TimeUnit.MINUTES).build();
+  private static final Map<String, Catalog> CATALOG_CACHE = new 
ConcurrentHashMap<>();
+  private static final LoadingCache<String, Table> INTERNAL_CACHE =
+      CacheBuilder.newBuilder()
+          .expireAfterAccess(1, TimeUnit.HOURS)
+          .refreshAfterWrite(3, TimeUnit.MINUTES)
+          .build(
+              new CacheLoader<String, Table>() {
+                @Override
+                public Table load(String identifier) {
+                  return checkStateNotNull(CATALOG_CACHE.get(identifier))
+                      .loadTable(TableIdentifier.parse(identifier));
+                }
+
+                @Override
+                public ListenableFuture<Table> reload(String unusedIdentifier, 
Table table) {
+                  table.refresh();
+                  return Futures.immediateFuture(table);
+                }
+              });;
 
-  static Table get(TableIdentifier identifier, Catalog catalog) {
+  static Table get(String identifier) {
+    checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first.");
     try {
-      return CACHE.get(identifier, () -> catalog.loadTable(identifier));
+      return INTERNAL_CACHE.get(identifier);
     } catch (ExecutionException e) {
       throw new RuntimeException(
           "Encountered a problem fetching table " + identifier + " from 
cache.", e);
     }
   }
 
-  static Table get(String identifier, Catalog catalog) {
-    return get(TableIdentifier.parse(identifier), catalog);
-  }
-
-  static Table getRefreshed(TableIdentifier identifier, Catalog catalog) {
-    @Nullable Table table = CACHE.getIfPresent(identifier);
-    if (table == null) {
-      return get(identifier, catalog);
-    }
-    table.refresh();
-    CACHE.put(identifier, table);
-    return table;
+  /** Forces a table refresh and returns. */
+  static Table getRefreshed(String identifier) {
+    checkStateNotNull(INTERNAL_CACHE, "Please call TableCache.setup() first.");

Review Comment:
   ditto



##########
sdks/java/io/iceberg/src/main/java/org/apache/beam/sdk/io/iceberg/ReadUtils.java:
##########
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.iceberg;
+
+import static org.apache.iceberg.util.SnapshotUtil.ancestorsOf;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.function.BiFunction;
+import java.util.stream.Collectors;
+import org.apache.beam.sdk.io.iceberg.IcebergIO.ReadRows.StartingStrategy;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.MoreObjects;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Lists;
+import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.Sets;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Snapshot;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableProperties;
+import org.apache.iceberg.data.IdentityPartitionConverters;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.data.parquet.GenericParquetReaders;
+import org.apache.iceberg.encryption.EncryptedFiles;
+import org.apache.iceberg.encryption.EncryptedInputFile;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileIO;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.mapping.NameMapping;
+import org.apache.iceberg.mapping.NameMappingParser;
+import org.apache.iceberg.parquet.ParquetReader;
+import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.TypeUtil;
+import org.apache.iceberg.util.PartitionUtil;
+import org.apache.iceberg.util.SnapshotUtil;
+import org.apache.parquet.HadoopReadOptions;
+import org.apache.parquet.ParquetReadOptions;
+import org.checkerframework.checker.nullness.qual.Nullable;
+
+/** Helper class for source operations. */
+public class ReadUtils {
+  // default is 8MB. keep this low to avoid overwhelming memory
+  static final int MAX_FILE_BUFFER_SIZE = 1 << 18; // 256KB
+  private static final Collection<String> READ_PROPERTIES_TO_REMOVE =
+      Sets.newHashSet(
+          "parquet.read.filter",
+          "parquet.private.read.filter.predicate",
+          "parquet.read.support.class",
+          "parquet.crypto.factory.class");
+
+  static ParquetReader<Record> createReader(FileScanTask task, Table table) {

Review Comment:
   as future optimization, would it be possible to decode directly from Parquet 
into the beam row without materializing the Record and then converting?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to