nastra commented on code in PR #14333: URL: https://github.com/apache/iceberg/pull/14333#discussion_r2597837215
########## gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.HostConfig; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports.Binding; +import com.google.cloud.NoCredentials; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.IOUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TestGcsFileIO { + + private static final String BUCKET = "test-bucket"; + private static final String PROJECT_ID = "test-project"; + private static final int GCS_EMULATOR_PORT = 4443; + private static final Random RANDOM = new Random(1); + + @Container + private static final GenericContainer<?> GCS_EMULATOR = + new GenericContainer<>("fsouza/fake-gcs-server:latest") + .withExposedPorts(GCS_EMULATOR_PORT) + .withCreateContainerCmdModifier( + cmd -> + cmd.withHostConfig( + new HostConfig() + .withPortBindings( + new PortBinding( + Binding.bindPort(GCS_EMULATOR_PORT), + new ExposedPort(GCS_EMULATOR_PORT))))) + .withCommand( + "-scheme", + "http", + "-external-url", + String.format("http://localhost:%d", GCS_EMULATOR_PORT)) + .waitingFor( + new HttpWaitStrategy() + .forPort(GCS_EMULATOR_PORT) + .forPath("/storage/v1/b") + .forStatusCode(200) + .withStartupTimeout(Duration.ofMinutes(2))); + + private GCSFileIO fileIO; + private static Storage storage; + + @BeforeAll + public static void beforeClass() { + GCS_EMULATOR.start(); Review Comment: container start should be already handled by the annotations I believe ########## gcp/src/main/java/org/apache/iceberg/gcp/gcs/GCSInputFile.java: ########## @@ -64,6 +77,45 @@ public long getLength() { @Override public SeekableInputStream newStream() { + if (gcpProperties().isGcsAnalyticsCoreEnabled()) { + try { + return newGoogleCloudStorageInputStream(); + } catch (IOException e) { + LOG.error( + "Failed to create GCS analytics core input stream for {}, falling back to default.", + uri(), + e); + } + } Review Comment: nit: newline after } here and in other places ########## gcp/src/integration/java/org/apache/iceberg/gcp/gcs/TestGcsFileIO.java: ########## @@ -0,0 +1,248 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; + +import com.github.dockerjava.api.model.ExposedPort; +import com.github.dockerjava.api.model.HostConfig; +import com.github.dockerjava.api.model.PortBinding; +import com.github.dockerjava.api.model.Ports.Binding; +import com.google.cloud.NoCredentials; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.BlobInfo; +import com.google.cloud.storage.BucketInfo; +import com.google.cloud.storage.Storage; +import com.google.cloud.storage.StorageException; +import com.google.cloud.storage.StorageOptions; +import java.io.IOException; +import java.io.InputStream; +import java.time.Duration; +import java.util.List; +import java.util.Random; +import java.util.stream.Collectors; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.io.FileInfo; +import org.apache.iceberg.io.IOUtil; +import org.apache.iceberg.io.InputFile; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; +import org.junit.jupiter.api.AfterAll; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeAll; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.wait.strategy.HttpWaitStrategy; +import org.testcontainers.junit.jupiter.Container; +import org.testcontainers.junit.jupiter.Testcontainers; + +@Testcontainers +public class TestGcsFileIO { + + private static final String BUCKET = "test-bucket"; + private static final String PROJECT_ID = "test-project"; + private static final int GCS_EMULATOR_PORT = 4443; + private static final Random RANDOM = new Random(1); + + @Container + private static final GenericContainer<?> GCS_EMULATOR = + new GenericContainer<>("fsouza/fake-gcs-server:latest") + .withExposedPorts(GCS_EMULATOR_PORT) + .withCreateContainerCmdModifier( + cmd -> + cmd.withHostConfig( + new HostConfig() + .withPortBindings( + new PortBinding( + Binding.bindPort(GCS_EMULATOR_PORT), + new ExposedPort(GCS_EMULATOR_PORT))))) + .withCommand( + "-scheme", + "http", + "-external-url", + String.format("http://localhost:%d", GCS_EMULATOR_PORT)) + .waitingFor( + new HttpWaitStrategy() + .forPort(GCS_EMULATOR_PORT) + .forPath("/storage/v1/b") + .forStatusCode(200) + .withStartupTimeout(Duration.ofMinutes(2))); + + private GCSFileIO fileIO; + private static Storage storage; + + @BeforeAll + public static void beforeClass() { + GCS_EMULATOR.start(); + String endpoint = String.format("http://localhost:%d", GCS_EMULATOR_PORT); + StorageOptions options = + StorageOptions.newBuilder() + .setProjectId(PROJECT_ID) + .setHost(endpoint) + .setCredentials(NoCredentials.getInstance()) + .build(); + storage = options.getService(); + storage.create(BucketInfo.of(BUCKET)); + } + + @AfterAll + public static void afterClass() { + if (storage != null) { + storage.delete(BUCKET); + } + GCS_EMULATOR.stop(); Review Comment: stopping should also be already handled ########## gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGoogleCloudStorageInputStreamWrapper.java: ########## @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import com.google.cloud.gcs.analyticscore.client.GcsObjectRange; +import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.function.IntFunction; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.metrics.MetricsContext; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class TestGoogleCloudStorageInputStreamWrapper { Review Comment: ```suggestion public class TestGCSInputStreamWrapper { ``` ########## gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import com.google.cloud.gcs.analyticscore.client.GcsFileInfo; +import com.google.cloud.gcs.analyticscore.client.GcsFileSystem; +import com.google.cloud.gcs.analyticscore.client.GcsItemId; +import com.google.cloud.gcs.analyticscore.client.GcsItemInfo; +import com.google.cloud.gcs.analyticscore.core.GcsAnalyticsCoreOptions; +import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; + +public class TestGcsInputFile { + + private static final String TEST_BUCKET = "TEST_BUCKET"; + private static final String KEY = "file/path/a.dat"; + private static final String LOCATION = "gs://" + TEST_BUCKET + "/" + KEY; + private static final long FILE_SIZE = 1024L; + + private Storage storage; + private GcsFileSystem gcsFileSystem; + private GcsAnalyticsCoreOptions gcsAnalyticsCoreOptions; + private PrefixedStorage prefixedStorage; + private GCPProperties gcpProperties; + private MetricsContext metricsContext; + private Blob blob; + + @BeforeEach + public void before() { + storage = mock(Storage.class); + gcsFileSystem = mock(GcsFileSystem.class); + prefixedStorage = mock(PrefixedStorage.class); + gcsAnalyticsCoreOptions = new GcsAnalyticsCoreOptions("", Collections.emptyMap()); + gcpProperties = new GCPProperties(); + metricsContext = MetricsContext.nullMetrics(); + blob = mock(Blob.class); + when(prefixedStorage.storage()).thenReturn(storage); + when(prefixedStorage.gcsFileSystem()).thenReturn(gcsFileSystem); + when(prefixedStorage.gcpProperties()).thenReturn(gcpProperties); + when(gcsFileSystem.getFileSystemOptions()) + .thenReturn(gcsAnalyticsCoreOptions.getGcsFileSystemOptions()); + when(storage.get(any(BlobId.class))).thenReturn(blob); + when(blob.getSize()).thenReturn(FILE_SIZE); + } + + @Test + public void fromLocation() { + GCSInputFile inputFile = GCSInputFile.fromLocation(LOCATION, prefixedStorage, metricsContext); + + assertThat(inputFile.blobId()).isEqualTo(BlobId.fromGsUtilUri(LOCATION)); + assertThat(inputFile.getLength()).isEqualTo(FILE_SIZE); + } + + @Test + public void fromLocationWithLength() { + GCSInputFile inputFile = + GCSInputFile.fromLocation(LOCATION, FILE_SIZE, prefixedStorage, metricsContext); + + assertThat(inputFile.blobId()).isEqualTo(BlobId.fromGsUtilUri(LOCATION)); + assertThat(inputFile.getLength()).isEqualTo(FILE_SIZE); + } + + @Test + public void getLength() { + when(blob.getSize()).thenReturn(FILE_SIZE); + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + null, + gcpProperties, + metricsContext); + + assertThat(inputFile.getLength()).isEqualTo(FILE_SIZE); + } + + @Test + public void getLengthCached() { + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + FILE_SIZE, + gcpProperties, + metricsContext); + + assertThat(inputFile.getLength()).isEqualTo(FILE_SIZE); + } + + @Test + public void newStreamGcsAnalyticsCoreEnabled() throws IOException { + GCPProperties enabledGcpProperties = + new GCPProperties(ImmutableMap.of("gcs.analytics-core.enabled", "true")); + BlobId blobId = BlobId.fromGsUtilUri(LOCATION); + GcsItemId itemId = + GcsItemId.builder() + .setBucketName(blobId.getBucket()) + .setObjectName(blobId.getName()) + .build(); + GcsItemInfo itemInfo = GcsItemInfo.builder().setItemId(itemId).setSize(FILE_SIZE).build(); + GcsFileInfo gcsFileInfo = + GcsFileInfo.builder() + .setItemInfo(itemInfo) + .setUri(URI.create(LOCATION)) + .setAttributes(ImmutableMap.of()) + .build(); + try (MockedStatic<GoogleCloudStorageInputStream> mocked = + mockStatic(GoogleCloudStorageInputStream.class)) { + mocked + .when(() -> GoogleCloudStorageInputStream.create(gcsFileSystem, gcsFileInfo)) + .thenReturn(mock(GoogleCloudStorageInputStream.class)); + + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + FILE_SIZE, + enabledGcpProperties, + metricsContext); + + try (SeekableInputStream stream = inputFile.newStream()) { + assertThat(stream).isInstanceOf(GoogleCloudStorageInputStreamWrapper.class); + } + } + } + + @Test + public void newStreamGcsAnalyticsCoreEnabledObjectSizeNull() throws IOException { + GCPProperties enabledGcpProperties = + new GCPProperties(ImmutableMap.of("gcs.analytics-core.enabled", "true")); + BlobId blobId = BlobId.fromGsUtilUri(LOCATION); + GcsItemId itemId = + GcsItemId.builder() + .setBucketName(blobId.getBucket()) + .setObjectName(blobId.getName()) + .build(); + try (MockedStatic<GoogleCloudStorageInputStream> mocked = + mockStatic(GoogleCloudStorageInputStream.class)) { + mocked + .when(() -> GoogleCloudStorageInputStream.create(gcsFileSystem, itemId)) + .thenReturn(mock(GoogleCloudStorageInputStream.class)); + + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + null, + enabledGcpProperties, + metricsContext); + + try (SeekableInputStream stream = inputFile.newStream()) { + assertThat(stream).isInstanceOf(GoogleCloudStorageInputStreamWrapper.class); + } + } + } + + @Test + public void newStreamGcsAnalyticsCoreDisabled() throws IOException { + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + FILE_SIZE, + gcpProperties, + metricsContext); + + try (MockedConstruction<GCSInputStream> mocked = + mockConstruction( + GCSInputStream.class, + (mock, context) -> { + assertThat(context.arguments()).hasSize(5); + assertThat(context.arguments().get(0)).isEqualTo(storage); + assertThat(context.arguments().get(1)).isEqualTo(BlobId.fromGsUtilUri(LOCATION)); + assertThat(context.arguments().get(2)).isEqualTo(FILE_SIZE); + assertThat(context.arguments().get(3)).isEqualTo(gcpProperties); + assertThat(context.arguments().get(4)).isEqualTo(metricsContext); + })) { + try (SeekableInputStream stream = inputFile.newStream()) { + assertThat(stream).isInstanceOf(GCSInputStream.class); + assertThat(mocked.constructed()).hasSize(1); + } + } + } + + @Test + public void newStream_analyticsCoreInitializationFailed() throws IOException { Review Comment: ```suggestion public void newStreamAnalyticsCoreInitializationFailed() throws IOException { ``` ########## gcp/src/test/java/org/apache/iceberg/gcp/gcs/TestGcsInputFile.java: ########## @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.mockConstruction; +import static org.mockito.Mockito.mockStatic; +import static org.mockito.Mockito.when; + +import com.google.cloud.gcs.analyticscore.client.GcsFileInfo; +import com.google.cloud.gcs.analyticscore.client.GcsFileSystem; +import com.google.cloud.gcs.analyticscore.client.GcsItemId; +import com.google.cloud.gcs.analyticscore.client.GcsItemInfo; +import com.google.cloud.gcs.analyticscore.core.GcsAnalyticsCoreOptions; +import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream; +import com.google.cloud.storage.Blob; +import com.google.cloud.storage.BlobId; +import com.google.cloud.storage.Storage; +import java.io.IOException; +import java.net.URI; +import java.util.Collections; +import org.apache.iceberg.gcp.GCPProperties; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.MetricsContext; +import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.MockedConstruction; +import org.mockito.MockedStatic; + +public class TestGcsInputFile { + + private static final String TEST_BUCKET = "TEST_BUCKET"; + private static final String KEY = "file/path/a.dat"; + private static final String LOCATION = "gs://" + TEST_BUCKET + "/" + KEY; + private static final long FILE_SIZE = 1024L; + + private Storage storage; + private GcsFileSystem gcsFileSystem; + private GcsAnalyticsCoreOptions gcsAnalyticsCoreOptions; + private PrefixedStorage prefixedStorage; + private GCPProperties gcpProperties; + private MetricsContext metricsContext; + private Blob blob; + + @BeforeEach + public void before() { + storage = mock(Storage.class); + gcsFileSystem = mock(GcsFileSystem.class); + prefixedStorage = mock(PrefixedStorage.class); + gcsAnalyticsCoreOptions = new GcsAnalyticsCoreOptions("", Collections.emptyMap()); + gcpProperties = new GCPProperties(); + metricsContext = MetricsContext.nullMetrics(); + blob = mock(Blob.class); + when(prefixedStorage.storage()).thenReturn(storage); + when(prefixedStorage.gcsFileSystem()).thenReturn(gcsFileSystem); + when(prefixedStorage.gcpProperties()).thenReturn(gcpProperties); + when(gcsFileSystem.getFileSystemOptions()) + .thenReturn(gcsAnalyticsCoreOptions.getGcsFileSystemOptions()); + when(storage.get(any(BlobId.class))).thenReturn(blob); + when(blob.getSize()).thenReturn(FILE_SIZE); + } + + @Test + public void fromLocation() { + GCSInputFile inputFile = GCSInputFile.fromLocation(LOCATION, prefixedStorage, metricsContext); + + assertThat(inputFile.blobId()).isEqualTo(BlobId.fromGsUtilUri(LOCATION)); + assertThat(inputFile.getLength()).isEqualTo(FILE_SIZE); + } + + @Test + public void fromLocationWithLength() { + GCSInputFile inputFile = + GCSInputFile.fromLocation(LOCATION, FILE_SIZE, prefixedStorage, metricsContext); + + assertThat(inputFile.blobId()).isEqualTo(BlobId.fromGsUtilUri(LOCATION)); + assertThat(inputFile.getLength()).isEqualTo(FILE_SIZE); + } + + @Test + public void getLength() { + when(blob.getSize()).thenReturn(FILE_SIZE); + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + null, + gcpProperties, + metricsContext); + + assertThat(inputFile.getLength()).isEqualTo(FILE_SIZE); + } + + @Test + public void getLengthCached() { + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + FILE_SIZE, + gcpProperties, + metricsContext); + + assertThat(inputFile.getLength()).isEqualTo(FILE_SIZE); + } + + @Test + public void newStreamGcsAnalyticsCoreEnabled() throws IOException { + GCPProperties enabledGcpProperties = + new GCPProperties(ImmutableMap.of("gcs.analytics-core.enabled", "true")); + BlobId blobId = BlobId.fromGsUtilUri(LOCATION); + GcsItemId itemId = + GcsItemId.builder() + .setBucketName(blobId.getBucket()) + .setObjectName(blobId.getName()) + .build(); + GcsItemInfo itemInfo = GcsItemInfo.builder().setItemId(itemId).setSize(FILE_SIZE).build(); + GcsFileInfo gcsFileInfo = + GcsFileInfo.builder() + .setItemInfo(itemInfo) + .setUri(URI.create(LOCATION)) + .setAttributes(ImmutableMap.of()) + .build(); + try (MockedStatic<GoogleCloudStorageInputStream> mocked = + mockStatic(GoogleCloudStorageInputStream.class)) { + mocked + .when(() -> GoogleCloudStorageInputStream.create(gcsFileSystem, gcsFileInfo)) + .thenReturn(mock(GoogleCloudStorageInputStream.class)); + + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + FILE_SIZE, + enabledGcpProperties, + metricsContext); + + try (SeekableInputStream stream = inputFile.newStream()) { + assertThat(stream).isInstanceOf(GoogleCloudStorageInputStreamWrapper.class); + } + } + } + + @Test + public void newStreamGcsAnalyticsCoreEnabledObjectSizeNull() throws IOException { + GCPProperties enabledGcpProperties = + new GCPProperties(ImmutableMap.of("gcs.analytics-core.enabled", "true")); + BlobId blobId = BlobId.fromGsUtilUri(LOCATION); + GcsItemId itemId = + GcsItemId.builder() + .setBucketName(blobId.getBucket()) + .setObjectName(blobId.getName()) + .build(); + try (MockedStatic<GoogleCloudStorageInputStream> mocked = + mockStatic(GoogleCloudStorageInputStream.class)) { + mocked + .when(() -> GoogleCloudStorageInputStream.create(gcsFileSystem, itemId)) + .thenReturn(mock(GoogleCloudStorageInputStream.class)); + + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + null, + enabledGcpProperties, + metricsContext); + + try (SeekableInputStream stream = inputFile.newStream()) { + assertThat(stream).isInstanceOf(GoogleCloudStorageInputStreamWrapper.class); + } + } + } + + @Test + public void newStreamGcsAnalyticsCoreDisabled() throws IOException { + GCSInputFile inputFile = + new GCSInputFile( + storage, + gcsFileSystem, + BlobId.fromGsUtilUri(LOCATION), + FILE_SIZE, + gcpProperties, + metricsContext); + + try (MockedConstruction<GCSInputStream> mocked = + mockConstruction( + GCSInputStream.class, + (mock, context) -> { + assertThat(context.arguments()).hasSize(5); + assertThat(context.arguments().get(0)).isEqualTo(storage); + assertThat(context.arguments().get(1)).isEqualTo(BlobId.fromGsUtilUri(LOCATION)); + assertThat(context.arguments().get(2)).isEqualTo(FILE_SIZE); + assertThat(context.arguments().get(3)).isEqualTo(gcpProperties); + assertThat(context.arguments().get(4)).isEqualTo(metricsContext); + })) { + try (SeekableInputStream stream = inputFile.newStream()) { + assertThat(stream).isInstanceOf(GCSInputStream.class); + assertThat(mocked.constructed()).hasSize(1); + } + } + } + + @Test + public void newStream_analyticsCoreInitializationFailed() throws IOException { + GCPProperties enabledGcpProperties = + new GCPProperties(ImmutableMap.of("gcs.analytics-core.enabled", "true")); + BlobId blobId = BlobId.fromGsUtilUri(LOCATION); + GcsItemId itemId = + GcsItemId.builder() + .setBucketName(blobId.getBucket()) + .setObjectName(blobId.getName()) + .build(); + + try (MockedStatic<GoogleCloudStorageInputStream> mocked = + mockStatic(GoogleCloudStorageInputStream.class)) { + mocked + .when(() -> GoogleCloudStorageInputStream.create(gcsFileSystem, itemId)) + .thenThrow(new IOException("GCS connector failed")); Review Comment: shouldn't there be a check that verifies that this error is thrown? Something like `assertThatThrownBy(..).isInstanceOf(...).hasMessage(..)` ########## gcp/src/main/java/org/apache/iceberg/gcp/gcs/GoogleCloudStorageInputStreamWrapper.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import com.google.api.client.util.Preconditions; +import com.google.cloud.gcs.analyticscore.client.GcsObjectRange; +import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.Counter; +import org.apache.iceberg.metrics.MetricsContext; + +class GoogleCloudStorageInputStreamWrapper extends SeekableInputStream implements RangeReadable { + private final Counter readBytesCounter; Review Comment: ```suggestion private final Counter readBytes; ``` ########## gcp/src/main/java/org/apache/iceberg/gcp/gcs/GoogleCloudStorageInputStreamWrapper.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import com.google.api.client.util.Preconditions; +import com.google.cloud.gcs.analyticscore.client.GcsObjectRange; +import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.Counter; +import org.apache.iceberg.metrics.MetricsContext; + +class GoogleCloudStorageInputStreamWrapper extends SeekableInputStream implements RangeReadable { Review Comment: ```suggestion class GCSInputStreamWrapper extends SeekableInputStream implements RangeReadable { ``` ########## gcp/src/main/java/org/apache/iceberg/gcp/gcs/GoogleCloudStorageInputStreamWrapper.java: ########## @@ -0,0 +1,114 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.gcp.gcs; + +import com.google.api.client.util.Preconditions; +import com.google.cloud.gcs.analyticscore.client.GcsObjectRange; +import com.google.cloud.gcs.analyticscore.core.GoogleCloudStorageInputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.function.IntFunction; +import java.util.stream.Collectors; +import org.apache.iceberg.io.FileIOMetricsContext; +import org.apache.iceberg.io.FileRange; +import org.apache.iceberg.io.RangeReadable; +import org.apache.iceberg.io.SeekableInputStream; +import org.apache.iceberg.metrics.Counter; +import org.apache.iceberg.metrics.MetricsContext; + +class GoogleCloudStorageInputStreamWrapper extends SeekableInputStream implements RangeReadable { + private final Counter readBytesCounter; + private final Counter readOperationsCounter; Review Comment: ```suggestion private final Counter readOperations; ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
