ajayky-os commented on code in PR #14333:
URL: https://github.com/apache/iceberg/pull/14333#discussion_r2576418282


##########
gcp-bundle/build.gradle:
##########
@@ -28,6 +28,7 @@ project(":iceberg-gcp-bundle") {
     implementation "com.google.cloud:google-cloud-storage"
     implementation "com.google.cloud:google-cloud-bigquery"
     implementation "com.google.cloud:google-cloud-core"
+    implementation "com.google.cloud.gcs.analytics:gcs-analytics-core:1.1.2"

Review Comment:
   Updated.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java:
##########
@@ -64,6 +78,33 @@ public long getLength() {
 
   @Override
   public SeekableInputStream newStream() {
+    if (gcpProperties().isGcsAnalyticsCoreEnabled()) {
+      try {
+        GcsFileInfo fileInfo = getGcsFileInfo();
+        return new GoogleCloudStorageInputStreamWrapper(
+            GoogleCloudStorageInputStream.create(gcsFileSystem(), fileInfo));
+      } catch (IOException e) {
+        LOG.error("Failed to create GCS analytics core  input stream.", e);
+        throw new RuntimeIOException(
+            e, "Failed to create GCS analytics core input stream for: %s", 
blobId().toGsUtilUri());
+      }
+    }
+
     return new GCSInputStream(storage(), blobId(), blobSize, gcpProperties(), 
metrics());
   }
+
+  GcsFileInfo getGcsFileInfo() {
+    BlobId blobId = blobId();
+    GcsItemId itemId =
+        GcsItemId.builder()
+            .setBucketName(blobId.getBucket())
+            .setObjectName(blobId.getName())
+            .build();
+    GcsItemInfo itemInfo = 
GcsItemInfo.builder().setItemId(itemId).setSize(getLength()).build();

Review Comment:
   Thank you for pointing the issue. Updated the implementation to avoid 
calling `getLength` and use length only when it is available. Now 
`gcsFileInfo()` is only called when `blobSize != null`.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java:
##########
@@ -117,5 +126,54 @@ public void close() {
       // GCS Storage does not appear to be closable, so release the reference
       storage = null;
     }
+
+    if (null != gcsFileSystem) {
+      gcsFileSystem.close();
+      gcsFileSystem = null;
+    }
+  }
+
+  static Credentials getCredentials(Map<String, String> properties, 
CloseableGroup closeableGroup) {
+    GCPProperties gcpProperties = new GCPProperties(properties);
+    if (gcpProperties.oauth2Token().isPresent()) {
+      return GCPAuthUtils.oauth2CredentialsFromGcpProperties(
+          new GCPProperties(properties), closeableGroup);
+    } else if (gcpProperties.noAuth()) {
+      return NoCredentials.getInstance();
+    } else {
+      return null;
+    }
+  }
+
+  public GcsFileSystem gcsFileSystem() {
+    if (gcsFileSystem == null) {
+      synchronized (this) {
+        if (gcsFileSystem == null) {
+          this.gcsFileSystem = gcsFileSystemSupplier.get();
+        }
+      }
+    }
+    return this.gcsFileSystem;
+  }
+
+  private SerializableSupplier<GcsFileSystem> getGcsFileSystemSupplier(
+      Map<String, String> properties) {
+    ImmutableMap.Builder<String, String> propertiesWithUserAgent =
+        new ImmutableMap.Builder<String, String>()
+            .putAll(properties)
+            .put("user-agent", GCS_FILE_IO_USER_AGENT);
+    GcsAnalyticsCoreOptions gcsAnalyticsCoreOptions =
+        new GcsAnalyticsCoreOptions("", propertiesWithUserAgent.build());
+    GcsFileSystemOptions fileSystemOptions = 
gcsAnalyticsCoreOptions.getGcsFileSystemOptions();

Review Comment:
   Currently all the config/options specific to read operation are supported by 
GcsFileSystem, and GcsFileSystem is being used for read operations only. 
   
https://github.com/GoogleCloudPlatform/gcs-analytics-core/blob/main/CONFIGURATION.md



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java:
##########
@@ -117,5 +126,54 @@ public void close() {
       // GCS Storage does not appear to be closable, so release the reference
       storage = null;
     }
+
+    if (null != gcsFileSystem) {
+      gcsFileSystem.close();
+      gcsFileSystem = null;
+    }
+  }
+
+  static Credentials getCredentials(Map<String, String> properties, 
CloseableGroup closeableGroup) {
+    GCPProperties gcpProperties = new GCPProperties(properties);
+    if (gcpProperties.oauth2Token().isPresent()) {
+      return GCPAuthUtils.oauth2CredentialsFromGcpProperties(
+          new GCPProperties(properties), closeableGroup);
+    } else if (gcpProperties.noAuth()) {
+      return NoCredentials.getInstance();
+    } else {
+      return null;
+    }
+  }
+
+  public GcsFileSystem gcsFileSystem() {

Review Comment:
   Update visibility to package private.



##########
build.gradle:
##########
@@ -717,6 +717,7 @@ project(':iceberg-gcp') {
     compileOnly platform(libs.google.libraries.bom)
     compileOnly "com.google.cloud:google-cloud-storage"
     compileOnly "com.google.cloud:google-cloud-kms"
+    compileOnly "com.google.cloud.gcs.analytics:gcs-analytics-core:1.1.2"

Review Comment:
   Done.



##########
gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+public class IntegrationTestGcsFileIO {
+
+  private static final String BUCKET = "test-bucket";
+  private static final String PROJECT_ID = "test-project";
+  private static final int GCS_EMULATOR_PORT = 4443;
+  private final Random random = new Random(1);
+
+  @Container
+  private static final GenericContainer<?> GCS_EMULATOR =
+      new GenericContainer<>("fsouza/fake-gcs-server:latest")
+          .withExposedPorts(GCS_EMULATOR_PORT)
+          .withCreateContainerCmdModifier(
+              cmd ->
+                  cmd.withHostConfig(
+                      new com.github.dockerjava.api.model.HostConfig()

Review Comment:
   Done.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java:
##########
@@ -64,6 +78,33 @@ public long getLength() {
 
   @Override
   public SeekableInputStream newStream() {
+    if (gcpProperties().isGcsAnalyticsCoreEnabled()) {
+      try {
+        GcsFileInfo fileInfo = getGcsFileInfo();
+        return new GoogleCloudStorageInputStreamWrapper(
+            GoogleCloudStorageInputStream.create(gcsFileSystem(), fileInfo));
+      } catch (IOException e) {
+        LOG.error("Failed to create GCS analytics core  input stream.", e);
+        throw new RuntimeIOException(
+            e, "Failed to create GCS analytics core input stream for: %s", 
blobId().toGsUtilUri());
+      }
+    }
+
     return new GCSInputStream(storage(), blobId(), blobSize, gcpProperties(), 
metrics());
   }
+
+  GcsFileInfo getGcsFileInfo() {

Review Comment:
   Done.



##########
gcp/src/integration/java/org/apache/iceberg/gcp/gcs/IntegrationTestGcsFileIO.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+public class IntegrationTestGcsFileIO {
+
+  private static final String BUCKET = "test-bucket";
+  private static final String PROJECT_ID = "test-project";
+  private static final int GCS_EMULATOR_PORT = 4443;
+  private final Random random = new Random(1);
+
+  @Container
+  private static final GenericContainer<?> GCS_EMULATOR =
+      new GenericContainer<>("fsouza/fake-gcs-server:latest")
+          .withExposedPorts(GCS_EMULATOR_PORT)
+          .withCreateContainerCmdModifier(
+              cmd ->
+                  cmd.withHostConfig(
+                      new com.github.dockerjava.api.model.HostConfig()
+                          .withPortBindings(
+                              new com.github.dockerjava.api.model.PortBinding(
+                                  
com.github.dockerjava.api.model.Ports.Binding.bindPort(
+                                      GCS_EMULATOR_PORT),
+                                  new 
com.github.dockerjava.api.model.ExposedPort(
+                                      GCS_EMULATOR_PORT)))))
+          .withCommand(
+              "-scheme",
+              "http",
+              "-external-url",
+              String.format("http://localhost:%d";, GCS_EMULATOR_PORT))
+          .waitingFor(
+              new HttpWaitStrategy()
+                  .forPort(GCS_EMULATOR_PORT)
+                  .forPath("/storage/v1/b")
+                  .forStatusCode(200)
+                  .withStartupTimeout(Duration.ofMinutes(2)));
+
+  private GCSFileIO fileIO;
+  private static Storage storage;
+
+  @BeforeAll
+  public static void beforeClass() {
+    GCS_EMULATOR.start();
+    String endpoint = String.format("http://localhost:%d";, GCS_EMULATOR_PORT);
+    StorageOptions options =
+        StorageOptions.newBuilder()
+            .setProjectId(PROJECT_ID)
+            .setHost(endpoint)
+            .setCredentials(NoCredentials.getInstance())
+            .build();
+    storage = options.getService();
+    storage.create(BucketInfo.of(BUCKET));
+  }
+
+  @AfterAll
+  public static void afterClass() {
+    if (storage != null) {
+      storage.delete(BUCKET);
+    }
+    GCS_EMULATOR.stop();
+  }
+
+  @BeforeEach
+  public void before() {
+    fileIO = new GCSFileIO(() -> storage);
+    fileIO.initialize(ImmutableMap.of());
+    for (Blob blob : storage.list(BUCKET).iterateAll()) {
+      storage.delete(blob.getBlobId());
+    }
+  }
+
+  @AfterEach
+  public void after() {
+    for (Blob blob : storage.list(BUCKET).iterateAll()) {
+      storage.delete(blob.getBlobId());
+    }
+  }
+
+  @Test
+  public void testNewInputFileGcsAnalyticsCoreDisabled() throws IOException {

Review Comment:
   Updated the test cases to avoid this.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GoogleCloudStorageInputStreamWrapper.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
+import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileRange;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+
+public class GoogleCloudStorageInputStreamWrapper extends SeekableInputStream
+    implements RangeReadable {
+  private GoogleCloudStorageInputStream googleCloudStorageInputStream;
+
+  public GoogleCloudStorageInputStreamWrapper(
+      GoogleCloudStorageInputStream googleCloudStorageInputStream) {
+    this.googleCloudStorageInputStream = googleCloudStorageInputStream;

Review Comment:
   Done.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GoogleCloudStorageInputStreamWrapper.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
+import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileRange;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+
+public class GoogleCloudStorageInputStreamWrapper extends SeekableInputStream
+    implements RangeReadable {
+  private GoogleCloudStorageInputStream googleCloudStorageInputStream;

Review Comment:
   Done



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java:
##########
@@ -37,6 +49,7 @@ static GCSInputFile fromLocation(
       String location, long length, PrefixedStorage storage, MetricsContext 
metrics) {
     return new GCSInputFile(
         storage.storage(),
+        storage.gcsFileSystem(),

Review Comment:
   Some points I had to not pass `storage` directly are: 
   - Passing individual service in constructor explicitly declares services 
needed by these classes to function. 
   - PrefixedStorage has implementation details of creating and configuring  
`Storage` & `GcsFileSystem`.`GCSInputFile` and `GCSOutputFile` are not coupled 
to it, which makes it easier to test.
   
   Let me know of your perspective.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSOutputFile.java:
##########
@@ -34,12 +35,20 @@ class GCSOutputFile extends BaseGCSFile implements 
OutputFile {
   static GCSOutputFile fromLocation(
       String location, PrefixedStorage storage, MetricsContext metrics) {
     return new GCSOutputFile(
-        storage.storage(), BlobId.fromGsUtilUri(location), 
storage.gcpProperties(), metrics);
+        storage.storage(),
+        storage.gcsFileSystem(),

Review Comment:
   Currently `GcsFileSystem` is added in `GCSOutputFile` to support the 
`toInputFile()` used for reads. 
   
   We do plan to add features (GRPC support, Zonal Buckets, etc.) and 
optimizations in write behavior and are working on updating the documentation.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java:
##########
@@ -117,5 +126,54 @@ public void close() {
       // GCS Storage does not appear to be closable, so release the reference
       storage = null;
     }
+
+    if (null != gcsFileSystem) {
+      gcsFileSystem.close();
+      gcsFileSystem = null;
+    }
+  }
+
+  static Credentials getCredentials(Map<String, String> properties, 
CloseableGroup closeableGroup) {
+    GCPProperties gcpProperties = new GCPProperties(properties);
+    if (gcpProperties.oauth2Token().isPresent()) {
+      return GCPAuthUtils.oauth2CredentialsFromGcpProperties(
+          new GCPProperties(properties), closeableGroup);
+    } else if (gcpProperties.noAuth()) {
+      return NoCredentials.getInstance();
+    } else {
+      return null;
+    }
+  }
+
+  public GcsFileSystem gcsFileSystem() {
+    if (gcsFileSystem == null) {
+      synchronized (this) {
+        if (gcsFileSystem == null) {
+          this.gcsFileSystem = gcsFileSystemSupplier.get();
+        }
+      }
+    }
+    return this.gcsFileSystem;
+  }
+
+  private SerializableSupplier<GcsFileSystem> getGcsFileSystemSupplier(
+      Map<String, String> properties) {
+    ImmutableMap.Builder<String, String> propertiesWithUserAgent =
+        new ImmutableMap.Builder<String, String>()
+            .putAll(properties)
+            .put("user-agent", GCS_FILE_IO_USER_AGENT);
+    GcsAnalyticsCoreOptions gcsAnalyticsCoreOptions =
+        new GcsAnalyticsCoreOptions("", propertiesWithUserAgent.build());
+    GcsFileSystemOptions fileSystemOptions = 
gcsAnalyticsCoreOptions.getGcsFileSystemOptions();
+    if (this.closeableGroup == null) {
+      this.closeableGroup = new CloseableGroup();
+    }
+    Credentials credentials = getCredentials(properties, closeableGroup);

Review Comment:
   Credential being passed to the storage in default implementation and to 
GcsFileSystem are identical. IIUC credential refresh used by REST 
implementation is eventually handled at 
[OAuth2RefreshCredentialsHandler.java](https://github.com/apache/iceberg/blob/main/gcp/src/main/java/org/apache/iceberg/gcp/gcs/OAuth2RefreshCredentialsHandler.java)
 which is still common in both implementation.
   
   We also tested the implementation with credential vending enabled for 
biglake catalog which is REST implementation 
https://docs.cloud.google.com/biglake/docs/blms-rest-catalog#configure-catalog.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GoogleCloudStorageInputStreamWrapper.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
+import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileRange;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+
+public class GoogleCloudStorageInputStreamWrapper extends SeekableInputStream

Review Comment:
   `GCSInputStream` seems to be capturing metrics only for `read(...)` methods, 
added those metrics in the wrapper implementation. 
   
   Default implementation of 
[readVectored](https://github.com/apache/iceberg/blob/a2ac14180ecb8e38bb8172e715172b26194b763d/api/src/main/java/org/apache/iceberg/io/RangeReadable.java#L105)
 uses `readFully()` where we are not capturing these metrics, same with 
`readTail` method. It seems the existing metrics are not updated consistently. 
We can take this up separately to make these metrics consistent.  



##########
gcp/src/integration/java/org/apache/iceberg/gcp/gcs/IntegrationTestGcsFileIO.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+public class IntegrationTestGcsFileIO {

Review Comment:
   Done



##########
gcp/src/integration/java/org/apache/iceberg/gcp/gcs/IntegrationTestGcsFileIO.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+public class IntegrationTestGcsFileIO {
+
+  private static final String BUCKET = "test-bucket";
+  private static final String PROJECT_ID = "test-project";
+  private static final int GCS_EMULATOR_PORT = 4443;
+  private final Random random = new Random(1);
+
+  @Container
+  private static final GenericContainer<?> GCS_EMULATOR =
+      new GenericContainer<>("fsouza/fake-gcs-server:latest")
+          .withExposedPorts(GCS_EMULATOR_PORT)
+          .withCreateContainerCmdModifier(
+              cmd ->
+                  cmd.withHostConfig(
+                      new com.github.dockerjava.api.model.HostConfig()
+                          .withPortBindings(
+                              new com.github.dockerjava.api.model.PortBinding(
+                                  
com.github.dockerjava.api.model.Ports.Binding.bindPort(
+                                      GCS_EMULATOR_PORT),
+                                  new 
com.github.dockerjava.api.model.ExposedPort(
+                                      GCS_EMULATOR_PORT)))))
+          .withCommand(
+              "-scheme",
+              "http",
+              "-external-url",
+              String.format("http://localhost:%d";, GCS_EMULATOR_PORT))
+          .waitingFor(
+              new HttpWaitStrategy()
+                  .forPort(GCS_EMULATOR_PORT)
+                  .forPath("/storage/v1/b")
+                  .forStatusCode(200)
+                  .withStartupTimeout(Duration.ofMinutes(2)));
+
+  private GCSFileIO fileIO;
+  private static Storage storage;
+
+  @BeforeAll
+  public static void beforeClass() {
+    GCS_EMULATOR.start();
+    String endpoint = String.format("http://localhost:%d";, GCS_EMULATOR_PORT);
+    StorageOptions options =
+        StorageOptions.newBuilder()
+            .setProjectId(PROJECT_ID)
+            .setHost(endpoint)
+            .setCredentials(NoCredentials.getInstance())
+            .build();
+    storage = options.getService();
+    storage.create(BucketInfo.of(BUCKET));
+  }
+
+  @AfterAll
+  public static void afterClass() {
+    if (storage != null) {
+      storage.delete(BUCKET);
+    }
+    GCS_EMULATOR.stop();
+  }
+
+  @BeforeEach
+  public void before() {
+    fileIO = new GCSFileIO(() -> storage);
+    fileIO.initialize(ImmutableMap.of());
+    for (Blob blob : storage.list(BUCKET).iterateAll()) {
+      storage.delete(blob.getBlobId());
+    }
+  }
+
+  @AfterEach
+  public void after() {
+    for (Blob blob : storage.list(BUCKET).iterateAll()) {
+      storage.delete(blob.getBlobId());
+    }
+  }
+
+  @Test
+  public void testNewInputFileGcsAnalyticsCoreDisabled() throws IOException {
+    String location = String.format("gs://%s/path/to/file.txt", BUCKET);
+    byte[] expected = new byte[1024 * 1024];
+    random.nextBytes(expected);
+    
storage.create(BlobInfo.newBuilder(BlobId.fromGsUtilUri(location)).build(), 
expected);
+    InputFile in = fileIO.newInputFile(location);
+    byte[] actual = new byte[1024 * 1024];
+
+    try (InputStream is = in.newStream()) {
+      IOUtil.readFully(is, actual, 0, expected.length);
+    }
+
+    assertThat(actual).isEqualTo(expected);
+  }
+
+  @Test
+  public void testNewInputFileGcsAnalyticsCoreEnabled() throws IOException {
+    String location = String.format("gs://%s/path/to/file.txt", BUCKET);
+    byte[] expected = new byte[1024 * 1024];
+    random.nextBytes(expected);
+    
storage.create(BlobInfo.newBuilder(BlobId.fromGsUtilUri(location)).build(), 
expected);
+    fileIO.initialize(
+        ImmutableMap.of(
+            "gcs.analytics-core.enabled", "true",

Review Comment:
   Done.



##########
gcp/src/integration/java/org/apache/iceberg/gcp/gcs/IntegrationTestGcsFileIO.java:
##########
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import com.google.cloud.NoCredentials;
+import com.google.cloud.storage.Blob;
+import com.google.cloud.storage.BlobId;
+import com.google.cloud.storage.BlobInfo;
+import com.google.cloud.storage.BucketInfo;
+import com.google.cloud.storage.Storage;
+import com.google.cloud.storage.StorageOptions;
+import java.io.IOException;
+import java.io.InputStream;
+import java.time.Duration;
+import java.util.List;
+import java.util.Random;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileInfo;
+import org.apache.iceberg.io.IOUtil;
+import org.apache.iceberg.io.InputFile;
+import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
+import org.apache.iceberg.relocated.com.google.common.collect.Lists;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.wait.strategy.HttpWaitStrategy;
+import org.testcontainers.junit.jupiter.Container;
+import org.testcontainers.junit.jupiter.Testcontainers;
+
+@Testcontainers
+public class IntegrationTestGcsFileIO {
+
+  private static final String BUCKET = "test-bucket";
+  private static final String PROJECT_ID = "test-project";
+  private static final int GCS_EMULATOR_PORT = 4443;
+  private final Random random = new Random(1);

Review Comment:
   Done.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GoogleCloudStorageInputStreamWrapper.java:
##########
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.gcp.gcs;
+
+import com.google.cloud.gcs.analyticscore.client.GcsObjectRange;
+import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.IntFunction;
+import java.util.stream.Collectors;
+import org.apache.iceberg.io.FileRange;
+import org.apache.iceberg.io.RangeReadable;
+import org.apache.iceberg.io.SeekableInputStream;
+
+public class GoogleCloudStorageInputStreamWrapper extends SeekableInputStream

Review Comment:
   Done



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java:
##########
@@ -64,6 +78,33 @@ public long getLength() {
 
   @Override
   public SeekableInputStream newStream() {
+    if (gcpProperties().isGcsAnalyticsCoreEnabled()) {
+      try {
+        GcsFileInfo fileInfo = getGcsFileInfo();
+        return new GoogleCloudStorageInputStreamWrapper(
+            GoogleCloudStorageInputStream.create(gcsFileSystem(), fileInfo));
+      } catch (IOException e) {
+        LOG.error("Failed to create GCS analytics core  input stream.", e);
+        throw new RuntimeIOException(

Review Comment:
   SGTM, updated the implementation to fallback to GCSInputStream.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java:
##########
@@ -117,5 +126,54 @@ public void close() {
       // GCS Storage does not appear to be closable, so release the reference
       storage = null;
     }
+
+    if (null != gcsFileSystem) {
+      gcsFileSystem.close();
+      gcsFileSystem = null;
+    }
+  }
+
+  static Credentials getCredentials(Map<String, String> properties, 
CloseableGroup closeableGroup) {
+    GCPProperties gcpProperties = new GCPProperties(properties);
+    if (gcpProperties.oauth2Token().isPresent()) {
+      return GCPAuthUtils.oauth2CredentialsFromGcpProperties(
+          new GCPProperties(properties), closeableGroup);
+    } else if (gcpProperties.noAuth()) {
+      return NoCredentials.getInstance();
+    } else {
+      return null;
+    }
+  }
+
+  public GcsFileSystem gcsFileSystem() {
+    if (gcsFileSystem == null) {
+      synchronized (this) {
+        if (gcsFileSystem == null) {
+          this.gcsFileSystem = gcsFileSystemSupplier.get();
+        }
+      }
+    }
+    return this.gcsFileSystem;
+  }
+
+  private SerializableSupplier<GcsFileSystem> getGcsFileSystemSupplier(
+      Map<String, String> properties) {
+    ImmutableMap.Builder<String, String> propertiesWithUserAgent =
+        new ImmutableMap.Builder<String, String>()
+            .putAll(properties)
+            .put("user-agent", GCS_FILE_IO_USER_AGENT);
+    GcsAnalyticsCoreOptions gcsAnalyticsCoreOptions =
+        new GcsAnalyticsCoreOptions("", propertiesWithUserAgent.build());

Review Comment:
   Thanks for pointing this out, it should have been `"gcs."` on place of empty 
string, added unit test for the same.
   
   In iceberg config keys are prefixed with "gcs." eg: `gcs.project-id`, in 
hadoop connector similar flags are prefixed with "fs.gs" ex: 
`fs.gs.project-id`. We intend to use the same implementation in both the 
places. Hence this prefix is used to avoid repeating handing of similar flags.
   



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java:
##########
@@ -117,5 +126,54 @@ public void close() {
       // GCS Storage does not appear to be closable, so release the reference
       storage = null;
     }
+
+    if (null != gcsFileSystem) {
+      gcsFileSystem.close();
+      gcsFileSystem = null;
+    }
+  }
+
+  static Credentials getCredentials(Map<String, String> properties, 
CloseableGroup closeableGroup) {

Review Comment:
   Done.



##########
gcp/src/main/java/org/apache/iceberg/gcp/gcs/PrefixedStorage.java:
##########
@@ -117,5 +126,54 @@ public void close() {
       // GCS Storage does not appear to be closable, so release the reference
       storage = null;
     }
+
+    if (null != gcsFileSystem) {
+      gcsFileSystem.close();
+      gcsFileSystem = null;
+    }
+  }
+
+  static Credentials getCredentials(Map<String, String> properties, 
CloseableGroup closeableGroup) {
+    GCPProperties gcpProperties = new GCPProperties(properties);
+    if (gcpProperties.oauth2Token().isPresent()) {
+      return GCPAuthUtils.oauth2CredentialsFromGcpProperties(
+          new GCPProperties(properties), closeableGroup);
+    } else if (gcpProperties.noAuth()) {
+      return NoCredentials.getInstance();
+    } else {
+      return null;
+    }
+  }
+
+  public GcsFileSystem gcsFileSystem() {
+    if (gcsFileSystem == null) {
+      synchronized (this) {
+        if (gcsFileSystem == null) {
+          this.gcsFileSystem = gcsFileSystemSupplier.get();
+        }
+      }
+    }
+    return this.gcsFileSystem;
+  }
+
+  private SerializableSupplier<GcsFileSystem> getGcsFileSystemSupplier(
+      Map<String, String> properties) {
+    ImmutableMap.Builder<String, String> propertiesWithUserAgent =
+        new ImmutableMap.Builder<String, String>()
+            .putAll(properties)
+            .put("user-agent", GCS_FILE_IO_USER_AGENT);
+    GcsAnalyticsCoreOptions gcsAnalyticsCoreOptions =
+        new GcsAnalyticsCoreOptions("", propertiesWithUserAgent.build());
+    GcsFileSystemOptions fileSystemOptions = 
gcsAnalyticsCoreOptions.getGcsFileSystemOptions();
+    if (this.closeableGroup == null) {
+      this.closeableGroup = new CloseableGroup();

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to