gemini-code-assist[bot] commented on code in PR #38873: URL: https://github.com/apache/beam/pull/38873#discussion_r3382722269
########## sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import io.delta.kernel.engine.FileReadRequest; +import io.delta.kernel.engine.FileSystemClient; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; + +/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link FileSystems}. */ +public class BeamFileSystemClient implements FileSystemClient { + @Override + public CloseableIterator<FileStatus> listFrom(String path) throws IOException { + String glob = globForSiblings(path); + List<FileStatus> statuses = new ArrayList<>(); + String normalizedInput = FileSystems.matchNewResource(path, false).toString(); + + for (MatchResult.Metadata metadata : + FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) { + String metadataPath = metadata.resourceId().toString(); + if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(path)) >= 0) { + statuses.add(toDeltaFileStatus(metadata)); + } + } Review Comment:  The `normalizedInput` variable is defined on line 55 but never used. Instead, the raw `path` parameter is used in the comparison on line 60. This can lead to incorrect filtering in `listFrom` if the input path and the matched metadata paths have different normalization formats (e.g., `file:/` vs `file:///`). Additionally, `listFrom` should filter out directories to prevent downstream components from attempting to read directories as files. We can achieve this by checking `!metadata.resourceId().isDirectory()`. ```java for (MatchResult.Metadata metadata : FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) { if (metadata.resourceId().isDirectory()) { continue; } String metadataPath = metadata.resourceId().toString(); if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(normalizedInput)) >= 0) { statuses.add(toDeltaFileStatus(metadata)); } } ``` ########## sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java: ########## @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import io.delta.kernel.engine.FileReadRequest; +import io.delta.kernel.engine.FileSystemClient; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; + +/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link FileSystems}. */ +public class BeamFileSystemClient implements FileSystemClient { + @Override + public CloseableIterator<FileStatus> listFrom(String path) throws IOException { + String glob = globForSiblings(path); + List<FileStatus> statuses = new ArrayList<>(); + String normalizedInput = FileSystems.matchNewResource(path, false).toString(); + + for (MatchResult.Metadata metadata : + FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) { + String metadataPath = metadata.resourceId().toString(); + if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(path)) >= 0) { + statuses.add(toDeltaFileStatus(metadata)); + } + } + statuses.sort( + (first, second) -> + normalizeForOrdering(first.getPath()) + .compareTo(normalizeForOrdering(second.getPath()))); + return closeableIterator(statuses.iterator()); + } + + @Override + public String resolvePath(String path) throws IOException { + try { + return getFileStatus(path).getPath(); + } catch (IOException e) { + return FileSystems.matchNewResource(path, false).toString(); + } + } + + @Override + public CloseableIterator<ByteArrayInputStream> readFiles( + CloseableIterator<FileReadRequest> readRequests) { + return new CloseableIterator<ByteArrayInputStream>() { + @Override + public boolean hasNext() { + return readRequests.hasNext(); + } + + @Override + public ByteArrayInputStream next() { + FileReadRequest request = readRequests.next(); + try { + return readRange(request.getPath(), request.getStartOffset(), request.getReadLength()); + } catch (IOException e) { + throw new UncheckedIOException( + String.format( + "IOException reading from file %s at offset %s size %s", + request.getPath(), request.getStartOffset(), request.getReadLength()), + e); + } + } + + @Override + public void close() throws IOException { + readRequests.close(); + } + }; + } + + @Override + public boolean mkdirs(String path) throws IOException { + if (isLocalPath(path)) { + Files.createDirectories(toLocalPath(path)); + } + return true; + } + + @Override + public boolean delete(String path) throws IOException { + FileSystems.delete( + Collections.singletonList(FileSystems.matchNewResource(path, false)), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + return true; + } + + @Override + public FileStatus getFileStatus(String path) throws IOException { + return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path)); + } + + @Override + public void copyFileAtomically(String src, String dst, boolean overwrite) throws IOException { + ResourceId srcResource = FileSystems.matchNewResource(src, false); + ResourceId dstResource = FileSystems.matchNewResource(dst, false); + MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW); + if (!overwrite + && dstMatch.status() == MatchResult.Status.OK + && !dstMatch.metadata().isEmpty()) { + throw new IOException("Destination already exists: " + dst); + } + + if (overwrite) { + FileSystems.copy( + Collections.singletonList(srcResource), Collections.singletonList(dstResource)); + } else { + FileSystems.copy( + Collections.singletonList(srcResource), + Collections.singletonList(dstResource), + MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + } + } + + private static ByteArrayInputStream readRange(String path, int startOffset, int readLength) + throws IOException { + ResourceId resourceId = FileSystems.matchNewResource(path, false); + try (ReadableByteChannel channel = FileSystems.open(resourceId)) { + byte[] data = new byte[readLength]; + if (channel instanceof SeekableByteChannel) { + ((SeekableByteChannel) channel).position(startOffset); + readFully(channel, ByteBuffer.wrap(data)); + } else { + try (InputStream stream = Channels.newInputStream(channel)) { + stream.skipNBytes(startOffset); + int read = stream.readNBytes(data, 0, readLength); + if (read != readLength) { + throw new EOFException( + String.format("Expected %s bytes from %s but read %s", readLength, path, read)); + } + } + } Review Comment:  The methods `InputStream.skipNBytes` (introduced in Java 12) and `InputStream.readNBytes` (introduced in Java 9) are used here. Since Apache Beam supports Java 8 and Java 11 as target runtimes, using these Java 12+ APIs will cause `NoSuchMethodError` at runtime on those platforms. We can use Guava's `ByteStreams.skipFully` and `ByteStreams.readFully` instead, which are fully compatible with Java 8/11 and are already available on the classpath. ```java } else { try (InputStream stream = Channels.newInputStream(channel)) { com.google.common.io.ByteStreams.skipFully(stream, startOffset); com.google.common.io.ByteStreams.readFully(stream, data); } } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
