adutra commented on code in PR #1395:
URL: https://github.com/apache/polaris/pull/1395#discussion_r2052376090


##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -528,14 +543,177 @@ public Optional<LoadTableResponse> loadTableIfStale(
       }
     }
 
+    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    if (baseCatalog instanceof IcebergCatalog ic
+        && polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ENABLE_STREAMING_TABLE_METADATA)
+            .equals(true)) {
+      IcebergCatalog.TableMetadataProvider ops =
+          (IcebergCatalog.TableMetadataProvider) 
ic.newTableOps(tableIdentifier);
+      StreamingLoadTableResponse res =
+          ops.processLatestMetadata(
+              (fileIO, tableEntity) -> {
+                return new StreamingLoadTableResponse(
+                    tableEntity.getMetadataLocation(),
+                    
fileIO.newInputFile(tableEntity.getMetadataLocation()).newStream(),
+                    Map.of(),
+                    List.of());
+              },
+              () -> null);
+      if (res == null) {
+        throw new NoSuchTableException("Table does not exist: %s", 
tableIdentifier.toString());
+      }
+      return Optional.of(res);
+    }
     return Optional.of(CatalogHandlers.loadTable(baseCatalog, 
tableIdentifier));
   }
 
-  public LoadTableResponse loadTableWithAccessDelegation(
+  public static class StreamingLoadTableResponse implements RESTResponse {
+    private String metadataLocation;
+    private InputStream metadata;
+    private Map<String, String> config;
+    private List<Credential> credentials;
+
+    public StreamingLoadTableResponse() {}
+
+    public StreamingLoadTableResponse(
+        String metadataLocation,
+        InputStream metadata,
+        Map<String, String> config,
+        List<Credential> credentials) {
+      this.metadataLocation = metadataLocation;
+      this.metadata = metadata;
+      this.config = config;
+      this.credentials = credentials;
+    }
+
+    @Override
+    public void validate() {
+      Preconditions.checkNotNull(this.metadata, "Invalid metadata: null");
+    }
+
+    @JsonProperty("metadata-location")
+    public String metadataLocation() {
+      return this.metadataLocation;
+    }
+
+    @JsonProperty("metadata")
+    @JsonSerialize(using = InputStreamSerializer.class)
+    public InputStream tableMetadata() {
+      return this.metadata;
+    }
+
+    @JsonProperty("config")
+    public Map<String, String> config() {
+      return this.config != null ? this.config : Map.of();
+    }
+
+    @JsonProperty("storage-credentials")
+    public List<Credential> credentials() {
+      return this.credentials != null ? this.credentials : List.of();
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("metadataLocation", this.metadataLocation)
+          .add("metadata", this.metadata)
+          .add("config", this.config)
+          .toString();
+    }
+  }
+
+  public static class InputStreamSerializer extends RawSerializer<InputStream> 
{

Review Comment:
   Nit: I would extend `StdSerializer` instead. In spite of its name, 
`RawSerializer` is not a generic serializer for "raw" values.



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -528,14 +543,177 @@ public Optional<LoadTableResponse> loadTableIfStale(
       }
     }
 
+    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    if (baseCatalog instanceof IcebergCatalog ic
+        && polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ENABLE_STREAMING_TABLE_METADATA)
+            .equals(true)) {
+      IcebergCatalog.TableMetadataProvider ops =
+          (IcebergCatalog.TableMetadataProvider) 
ic.newTableOps(tableIdentifier);
+      StreamingLoadTableResponse res =
+          ops.processLatestMetadata(
+              (fileIO, tableEntity) -> {
+                return new StreamingLoadTableResponse(
+                    tableEntity.getMetadataLocation(),
+                    
fileIO.newInputFile(tableEntity.getMetadataLocation()).newStream(),
+                    Map.of(),
+                    List.of());
+              },

Review Comment:
   nit:
   
   ```suggestion
                 (fileIO, tableEntity) ->
                     new StreamingLoadTableResponse(
                         tableEntity.getMetadataLocation(),
                         
fileIO.newInputFile(tableEntity.getMetadataLocation()).newStream(),
                         Map.of(),
                         List.of()),
   ```



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -528,14 +543,177 @@ public Optional<LoadTableResponse> loadTableIfStale(
       }
     }
 
+    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    if (baseCatalog instanceof IcebergCatalog ic
+        && polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ENABLE_STREAMING_TABLE_METADATA)
+            .equals(true)) {
+      IcebergCatalog.TableMetadataProvider ops =
+          (IcebergCatalog.TableMetadataProvider) 
ic.newTableOps(tableIdentifier);
+      StreamingLoadTableResponse res =
+          ops.processLatestMetadata(
+              (fileIO, tableEntity) -> {
+                return new StreamingLoadTableResponse(
+                    tableEntity.getMetadataLocation(),
+                    
fileIO.newInputFile(tableEntity.getMetadataLocation()).newStream(),
+                    Map.of(),
+                    List.of());
+              },
+              () -> null);
+      if (res == null) {
+        throw new NoSuchTableException("Table does not exist: %s", 
tableIdentifier.toString());
+      }
+      return Optional.of(res);
+    }
     return Optional.of(CatalogHandlers.loadTable(baseCatalog, 
tableIdentifier));
   }
 
-  public LoadTableResponse loadTableWithAccessDelegation(
+  public static class StreamingLoadTableResponse implements RESTResponse {
+    private String metadataLocation;
+    private InputStream metadata;
+    private Map<String, String> config;
+    private List<Credential> credentials;
+
+    public StreamingLoadTableResponse() {}
+
+    public StreamingLoadTableResponse(
+        String metadataLocation,
+        InputStream metadata,
+        Map<String, String> config,
+        List<Credential> credentials) {
+      this.metadataLocation = metadataLocation;
+      this.metadata = metadata;
+      this.config = config;
+      this.credentials = credentials;
+    }
+
+    @Override
+    public void validate() {
+      Preconditions.checkNotNull(this.metadata, "Invalid metadata: null");
+    }
+
+    @JsonProperty("metadata-location")
+    public String metadataLocation() {
+      return this.metadataLocation;
+    }
+
+    @JsonProperty("metadata")
+    @JsonSerialize(using = InputStreamSerializer.class)
+    public InputStream tableMetadata() {
+      return this.metadata;
+    }
+
+    @JsonProperty("config")
+    public Map<String, String> config() {
+      return this.config != null ? this.config : Map.of();
+    }
+
+    @JsonProperty("storage-credentials")
+    public List<Credential> credentials() {
+      return this.credentials != null ? this.credentials : List.of();
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("metadataLocation", this.metadataLocation)
+          .add("metadata", this.metadata)
+          .add("config", this.config)
+          .toString();
+    }
+  }
+
+  public static class InputStreamSerializer extends RawSerializer<InputStream> 
{
+    public InputStreamSerializer() {
+      super(InputStream.class);
+    }
+
+    @Override
+    public void serialize(
+        InputStream inputStream, JsonGenerator jsonGenerator, 
SerializerProvider serializerProvider)
+        throws IOException {
+      LoggerFactory.getLogger(getClass()).warn("Using custom serializer for 
input stream");
+      byte[] buffer = new byte[4096];
+      try (inputStream) {
+        // tell jackson we're about to write a value
+        // this ensures we get the correct token prefixing the value (i.e., a 
: for a field value or
+        // a , for an array)
+        jsonGenerator.writeRawValue("");

Review Comment:
   The following alternative looks slightly less hacky imho:
   
   ```java
           jsonGenerator.writeRawValue("");
           jsonGenerator.flush();
           if (jsonGenerator.getOutputTarget() instanceof OutputStream out) {
             inputStream.transferTo(out);
           } else if (jsonGenerator.getOutputTarget() instanceof Writer writer) 
{
             new InputStreamReader(inputStream, 
StandardCharsets.UTF_8).transferTo(writer);
           } else {
             throw new IllegalStateException(
                 "Cannot serialize InputFile to unknown output target: "
                     + jsonGenerator.getOutputTarget());
           }
   ```



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -528,14 +543,177 @@ public Optional<LoadTableResponse> loadTableIfStale(
       }
     }
 
+    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    if (baseCatalog instanceof IcebergCatalog ic
+        && polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ENABLE_STREAMING_TABLE_METADATA)
+            .equals(true)) {
+      IcebergCatalog.TableMetadataProvider ops =
+          (IcebergCatalog.TableMetadataProvider) 
ic.newTableOps(tableIdentifier);
+      StreamingLoadTableResponse res =
+          ops.processLatestMetadata(
+              (fileIO, tableEntity) -> {
+                return new StreamingLoadTableResponse(
+                    tableEntity.getMetadataLocation(),
+                    
fileIO.newInputFile(tableEntity.getMetadataLocation()).newStream(),
+                    Map.of(),
+                    List.of());
+              },
+              () -> null);
+      if (res == null) {
+        throw new NoSuchTableException("Table does not exist: %s", 
tableIdentifier.toString());
+      }
+      return Optional.of(res);
+    }
     return Optional.of(CatalogHandlers.loadTable(baseCatalog, 
tableIdentifier));
   }
 
-  public LoadTableResponse loadTableWithAccessDelegation(
+  public static class StreamingLoadTableResponse implements RESTResponse {
+    private String metadataLocation;
+    private InputStream metadata;
+    private Map<String, String> config;
+    private List<Credential> credentials;
+
+    public StreamingLoadTableResponse() {}
+
+    public StreamingLoadTableResponse(
+        String metadataLocation,
+        InputStream metadata,
+        Map<String, String> config,
+        List<Credential> credentials) {
+      this.metadataLocation = metadataLocation;
+      this.metadata = metadata;
+      this.config = config;
+      this.credentials = credentials;
+    }
+
+    @Override
+    public void validate() {
+      Preconditions.checkNotNull(this.metadata, "Invalid metadata: null");
+    }
+
+    @JsonProperty("metadata-location")
+    public String metadataLocation() {
+      return this.metadataLocation;
+    }
+
+    @JsonProperty("metadata")
+    @JsonSerialize(using = InputStreamSerializer.class)
+    public InputStream tableMetadata() {
+      return this.metadata;
+    }
+
+    @JsonProperty("config")
+    public Map<String, String> config() {
+      return this.config != null ? this.config : Map.of();
+    }
+
+    @JsonProperty("storage-credentials")
+    public List<Credential> credentials() {
+      return this.credentials != null ? this.credentials : List.of();
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("metadataLocation", this.metadataLocation)
+          .add("metadata", this.metadata)
+          .add("config", this.config)
+          .toString();
+    }
+  }
+
+  public static class InputStreamSerializer extends RawSerializer<InputStream> 
{
+    public InputStreamSerializer() {
+      super(InputStream.class);
+    }
+
+    @Override
+    public void serialize(
+        InputStream inputStream, JsonGenerator jsonGenerator, 
SerializerProvider serializerProvider)
+        throws IOException {
+      LoggerFactory.getLogger(getClass()).warn("Using custom serializer for 
input stream");
+      byte[] buffer = new byte[4096];
+      try (inputStream) {

Review Comment:
   Nit: I would rather model the field as an `InputFile`, then create a 
serializer for `InputFile`. This would make it clear when the input stream is 
created and closed.
   ```suggestion
         try (InputStream inputStream = inputFile.newStream()) {
   ```



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -528,14 +543,177 @@ public Optional<LoadTableResponse> loadTableIfStale(
       }
     }
 
+    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    if (baseCatalog instanceof IcebergCatalog ic
+        && polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ENABLE_STREAMING_TABLE_METADATA)
+            .equals(true)) {

Review Comment:
   nit:
   
   ```suggestion
               .getConfiguration(
                   polarisCallContext, 
FeatureConfiguration.ENABLE_STREAMING_TABLE_METADATA)) {
   ```



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -528,14 +543,177 @@ public Optional<LoadTableResponse> loadTableIfStale(
       }
     }
 
+    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    if (baseCatalog instanceof IcebergCatalog ic
+        && polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ENABLE_STREAMING_TABLE_METADATA)
+            .equals(true)) {
+      IcebergCatalog.TableMetadataProvider ops =
+          (IcebergCatalog.TableMetadataProvider) 
ic.newTableOps(tableIdentifier);
+      StreamingLoadTableResponse res =
+          ops.processLatestMetadata(
+              (fileIO, tableEntity) -> {
+                return new StreamingLoadTableResponse(
+                    tableEntity.getMetadataLocation(),
+                    
fileIO.newInputFile(tableEntity.getMetadataLocation()).newStream(),
+                    Map.of(),
+                    List.of());
+              },
+              () -> null);
+      if (res == null) {
+        throw new NoSuchTableException("Table does not exist: %s", 
tableIdentifier.toString());
+      }
+      return Optional.of(res);
+    }
     return Optional.of(CatalogHandlers.loadTable(baseCatalog, 
tableIdentifier));
   }
 
-  public LoadTableResponse loadTableWithAccessDelegation(
+  public static class StreamingLoadTableResponse implements RESTResponse {
+    private String metadataLocation;
+    private InputStream metadata;
+    private Map<String, String> config;
+    private List<Credential> credentials;
+
+    public StreamingLoadTableResponse() {}
+
+    public StreamingLoadTableResponse(
+        String metadataLocation,
+        InputStream metadata,
+        Map<String, String> config,
+        List<Credential> credentials) {
+      this.metadataLocation = metadataLocation;
+      this.metadata = metadata;
+      this.config = config;
+      this.credentials = credentials;
+    }
+
+    @Override
+    public void validate() {
+      Preconditions.checkNotNull(this.metadata, "Invalid metadata: null");
+    }
+
+    @JsonProperty("metadata-location")
+    public String metadataLocation() {
+      return this.metadataLocation;
+    }
+
+    @JsonProperty("metadata")
+    @JsonSerialize(using = InputStreamSerializer.class)
+    public InputStream tableMetadata() {
+      return this.metadata;
+    }
+
+    @JsonProperty("config")
+    public Map<String, String> config() {
+      return this.config != null ? this.config : Map.of();
+    }
+
+    @JsonProperty("storage-credentials")
+    public List<Credential> credentials() {
+      return this.credentials != null ? this.credentials : List.of();
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("metadataLocation", this.metadataLocation)
+          .add("metadata", this.metadata)
+          .add("config", this.config)
+          .toString();
+    }
+  }

Review Comment:
   Could be a record:
   
   ```suggestion
     public record StreamingLoadTableResponse(
         @JsonProperty("metadata-location") String metadataLocation,
         @JsonProperty("metadata") @JsonSerialize(using = 
InputStreamSerializer.class)
             InputStream metadata,
         @JsonProperty("config") Map<String, String> config,
         @JsonProperty("storage-credentials") List<Credential> credentials)
         implements RESTResponse {
   
       @Override
       public void validate() {
         Preconditions.checkNotNull(this.metadata, "Invalid metadata: null");
       }
     }
   ```



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/SupportsCredentialDelegation.java:
##########
@@ -36,4 +38,12 @@ Map<String, String> getCredentialConfig(
       TableIdentifier tableIdentifier,
       TableMetadata tableMetadata,
       Set<PolarisStorageActions> storageActions);
+
+  Map<String, String> getCredentialConfig(

Review Comment:
   Unrelated?



##########
service/common/src/main/java/org/apache/polaris/service/catalog/iceberg/IcebergCatalogHandler.java:
##########
@@ -528,14 +543,177 @@ public Optional<LoadTableResponse> loadTableIfStale(
       }
     }
 
+    PolarisCallContext polarisCallContext = 
callContext.getPolarisCallContext();
+    if (baseCatalog instanceof IcebergCatalog ic
+        && polarisCallContext
+            .getConfigurationStore()
+            .getConfiguration(
+                polarisCallContext, 
FeatureConfiguration.ENABLE_STREAMING_TABLE_METADATA)
+            .equals(true)) {
+      IcebergCatalog.TableMetadataProvider ops =
+          (IcebergCatalog.TableMetadataProvider) 
ic.newTableOps(tableIdentifier);
+      StreamingLoadTableResponse res =
+          ops.processLatestMetadata(
+              (fileIO, tableEntity) -> {
+                return new StreamingLoadTableResponse(
+                    tableEntity.getMetadataLocation(),
+                    
fileIO.newInputFile(tableEntity.getMetadataLocation()).newStream(),
+                    Map.of(),
+                    List.of());
+              },
+              () -> null);
+      if (res == null) {
+        throw new NoSuchTableException("Table does not exist: %s", 
tableIdentifier.toString());
+      }
+      return Optional.of(res);
+    }
     return Optional.of(CatalogHandlers.loadTable(baseCatalog, 
tableIdentifier));
   }
 
-  public LoadTableResponse loadTableWithAccessDelegation(
+  public static class StreamingLoadTableResponse implements RESTResponse {
+    private String metadataLocation;
+    private InputStream metadata;
+    private Map<String, String> config;
+    private List<Credential> credentials;
+
+    public StreamingLoadTableResponse() {}
+
+    public StreamingLoadTableResponse(
+        String metadataLocation,
+        InputStream metadata,
+        Map<String, String> config,
+        List<Credential> credentials) {
+      this.metadataLocation = metadataLocation;
+      this.metadata = metadata;
+      this.config = config;
+      this.credentials = credentials;
+    }
+
+    @Override
+    public void validate() {
+      Preconditions.checkNotNull(this.metadata, "Invalid metadata: null");
+    }
+
+    @JsonProperty("metadata-location")
+    public String metadataLocation() {
+      return this.metadataLocation;
+    }
+
+    @JsonProperty("metadata")
+    @JsonSerialize(using = InputStreamSerializer.class)
+    public InputStream tableMetadata() {
+      return this.metadata;
+    }
+
+    @JsonProperty("config")
+    public Map<String, String> config() {
+      return this.config != null ? this.config : Map.of();
+    }
+
+    @JsonProperty("storage-credentials")
+    public List<Credential> credentials() {
+      return this.credentials != null ? this.credentials : List.of();
+    }
+
+    @Override
+    public String toString() {
+      return MoreObjects.toStringHelper(this)
+          .add("metadataLocation", this.metadataLocation)
+          .add("metadata", this.metadata)
+          .add("config", this.config)
+          .toString();
+    }
+  }
+
+  public static class InputStreamSerializer extends RawSerializer<InputStream> 
{
+    public InputStreamSerializer() {
+      super(InputStream.class);
+    }
+
+    @Override
+    public void serialize(
+        InputStream inputStream, JsonGenerator jsonGenerator, 
SerializerProvider serializerProvider)
+        throws IOException {
+      LoggerFactory.getLogger(getClass()).warn("Using custom serializer for 
input stream");
+      byte[] buffer = new byte[4096];
+      try (inputStream) {
+        // tell jackson we're about to write a value
+        // this ensures we get the correct token prefixing the value (i.e., a 
: for a field value or
+        // a , for an array)
+        jsonGenerator.writeRawValue("");
+        for (int read = inputStream.read(buffer); read >= 0; read = 
inputStream.read(buffer)) {
+          char[] charArray = new String(buffer, 0, read, 
StandardCharsets.UTF_8).toCharArray();
+          jsonGenerator.writeRaw(charArray, 0, charArray.length);
+        }
+      }
+    }
+  }
+
+  public RESTResponse loadTableWithAccessDelegation(
       TableIdentifier tableIdentifier, String snapshots) {
     return loadTableWithAccessDelegationIfStale(tableIdentifier, null, 
snapshots).get();
   }
 
+  public LoadCredentialsResponse loadAccessDelegation(

Review Comment:
   This addition looks unrelated?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@polaris.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to