gemini-code-assist[bot] commented on code in PR #38873: URL: https://github.com/apache/beam/pull/38873#discussion_r3396122898
########## sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import io.delta.kernel.engine.FileReadRequest; +import io.delta.kernel.engine.FileSystemClient; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; + +/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link FileSystems}. */ +public class BeamFileSystemClient implements FileSystemClient { + @Override + public CloseableIterator<FileStatus> listFrom(String path) throws IOException { + String glob = globForSiblings(path); + List<FileStatus> statuses = new ArrayList<>(); + String normalizedInput = FileSystems.matchNewResource(path, false).toString(); + for (MatchResult.Metadata metadata : + FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) { + if (metadata.resourceId().isDirectory()) { + continue; + } + String metadataPath = metadata.resourceId().toString(); + if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(normalizedInput)) >= 0) { + statuses.add(toDeltaFileStatus(metadata)); + } + } + statuses.sort( + (first, second) -> + normalizeForOrdering(first.getPath()) + .compareTo(normalizeForOrdering(second.getPath()))); + return closeableIterator(statuses.iterator()); + } + + @Override + public String resolvePath(String path) throws IOException { + try { + return getFileStatus(path).getPath(); + } catch (IOException e) { + return FileSystems.matchNewResource(path, false).toString(); + } + } + + @Override + public CloseableIterator<ByteArrayInputStream> readFiles( + CloseableIterator<FileReadRequest> readRequests) { + return new CloseableIterator<ByteArrayInputStream>() { + @Override + public boolean hasNext() { + return readRequests.hasNext(); + } + + @Override + public ByteArrayInputStream next() { + FileReadRequest request = readRequests.next(); + try { + return readRange(request.getPath(), request.getStartOffset(), request.getReadLength()); + } catch (IOException e) { + throw new UncheckedIOException( + String.format( + "IOException reading from file %s at offset %s size %s", + request.getPath(), request.getStartOffset(), request.getReadLength()), + e); + } + } + + @Override + public void close() throws IOException { + readRequests.close(); + } + }; + } + + @Override + public boolean mkdirs(String path) throws IOException { + if (isLocalPath(path)) { + Files.createDirectories(toLocalPath(path)); + } + return true; + } + + @Override + public boolean delete(String path) throws IOException { + FileSystems.delete( + Collections.singletonList(FileSystems.matchNewResource(path, false)), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + return true; + } + + @Override + public FileStatus getFileStatus(String path) throws IOException { + return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path)); + } + + @Override + public void copyFileAtomically(String src, String dst, boolean overwrite) throws IOException { + ResourceId srcResource = FileSystems.matchNewResource(src, false); + ResourceId dstResource = FileSystems.matchNewResource(dst, false); + MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW); + if (!overwrite + && dstMatch.status() == MatchResult.Status.OK + && !dstMatch.metadata().isEmpty()) { + throw new IOException("Destination already exists: " + dst); + } + + if (overwrite) { + FileSystems.copy( + Collections.singletonList(srcResource), Collections.singletonList(dstResource)); + } else { + FileSystems.copy( + Collections.singletonList(srcResource), + Collections.singletonList(dstResource), + MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + } + } Review Comment:  The three-argument overload of `FileSystems.copy` does not exist in Apache Beam. Passing `MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS` as a third argument will cause a compilation error. Since you already perform a pre-check to throw an exception if the destination exists and `overwrite` is false, you can safely use the standard two-argument `FileSystems.copy` method for both cases. ```suggestion FileSystems.copy( Collections.singletonList(srcResource), Collections.singletonList(dstResource)); ``` ########## sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import io.delta.kernel.engine.FileReadRequest; +import io.delta.kernel.engine.FileSystemClient; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; + +/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link FileSystems}. */ +public class BeamFileSystemClient implements FileSystemClient { + @Override + public CloseableIterator<FileStatus> listFrom(String path) throws IOException { + String glob = globForSiblings(path); + List<FileStatus> statuses = new ArrayList<>(); + String normalizedInput = FileSystems.matchNewResource(path, false).toString(); + for (MatchResult.Metadata metadata : + FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) { + if (metadata.resourceId().isDirectory()) { + continue; + } + String metadataPath = metadata.resourceId().toString(); + if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(normalizedInput)) >= 0) { + statuses.add(toDeltaFileStatus(metadata)); + } + } + statuses.sort( + (first, second) -> + normalizeForOrdering(first.getPath()) + .compareTo(normalizeForOrdering(second.getPath()))); + return closeableIterator(statuses.iterator()); + } + + @Override + public String resolvePath(String path) throws IOException { + try { + return getFileStatus(path).getPath(); + } catch (IOException e) { + return FileSystems.matchNewResource(path, false).toString(); + } + } + + @Override + public CloseableIterator<ByteArrayInputStream> readFiles( + CloseableIterator<FileReadRequest> readRequests) { + return new CloseableIterator<ByteArrayInputStream>() { + @Override + public boolean hasNext() { + return readRequests.hasNext(); + } + + @Override + public ByteArrayInputStream next() { + FileReadRequest request = readRequests.next(); + try { + return readRange(request.getPath(), request.getStartOffset(), request.getReadLength()); + } catch (IOException e) { + throw new UncheckedIOException( + String.format( + "IOException reading from file %s at offset %s size %s", + request.getPath(), request.getStartOffset(), request.getReadLength()), + e); + } + } + + @Override + public void close() throws IOException { + readRequests.close(); + } + }; + } + + @Override + public boolean mkdirs(String path) throws IOException { + if (isLocalPath(path)) { + Files.createDirectories(toLocalPath(path)); + } + return true; + } + + @Override + public boolean delete(String path) throws IOException { + FileSystems.delete( + Collections.singletonList(FileSystems.matchNewResource(path, false)), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + return true; + } + + @Override + public FileStatus getFileStatus(String path) throws IOException { + return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path)); + } + + @Override + public void copyFileAtomically(String src, String dst, boolean overwrite) throws IOException { + ResourceId srcResource = FileSystems.matchNewResource(src, false); + ResourceId dstResource = FileSystems.matchNewResource(dst, false); + MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW); + if (!overwrite + && dstMatch.status() == MatchResult.Status.OK + && !dstMatch.metadata().isEmpty()) { + throw new IOException("Destination already exists: " + dst); + } + + if (overwrite) { + FileSystems.copy( + Collections.singletonList(srcResource), Collections.singletonList(dstResource)); + } else { + FileSystems.copy( + Collections.singletonList(srcResource), + Collections.singletonList(dstResource), + MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + } + } + + private static ByteArrayInputStream readRange(String path, int startOffset, int readLength) + throws IOException { + ResourceId resourceId = FileSystems.matchNewResource(path, false); + try (ReadableByteChannel channel = FileSystems.open(resourceId)) { + byte[] data = new byte[readLength]; + if (channel instanceof SeekableByteChannel) { + ((SeekableByteChannel) channel).position(startOffset); + readFully(channel, ByteBuffer.wrap(data)); + } else { + try (InputStream stream = Channels.newInputStream(channel)) { + com.google.common.io.ByteStreams.skipFully(stream, startOffset); + com.google.common.io.ByteStreams.readFully(stream, data); + } + } + return new ByteArrayInputStream(data); + } + } + + private static void readFully(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + if (channel.read(buffer) < 0) { + throw new EOFException("Unexpected end of file"); + } + } + } + + private static FileStatus toDeltaFileStatus(MatchResult.Metadata metadata) { + return FileStatus.of( + metadata.resourceId().toString(), metadata.sizeBytes(), metadata.lastModifiedMillis()); + } + + private static String globForSiblings(String path) { + String normalized = path.replace('\\', '/'); + int lastSlash = normalized.lastIndexOf('/'); + if (lastSlash < 0) { + return "*"; + } + return path.substring(0, lastSlash + 1) + "*"; + } Review Comment:  Using `path.substring` in `globForSiblings` can preserve backslashes on Windows platforms, which may cause issues with glob pattern matching in some filesystems. It is safer to use the `normalized` string (which has forward slashes) to construct the glob pattern. ```suggestion private static String globForSiblings(String path) { String normalized = path.replace('\\', '/'); int lastSlash = normalized.lastIndexOf('/'); if (lastSlash < 0) { return "*"; } return normalized.substring(0, lastSlash + 1) + "*"; } ``` ########## sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java: ########## @@ -0,0 +1,234 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.delta; + +import io.delta.kernel.engine.FileReadRequest; +import io.delta.kernel.engine.FileSystemClient; +import io.delta.kernel.utils.CloseableIterator; +import io.delta.kernel.utils.FileStatus; +import java.io.ByteArrayInputStream; +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.UncheckedIOException; +import java.net.URI; +import java.nio.ByteBuffer; +import java.nio.channels.Channels; +import java.nio.channels.ReadableByteChannel; +import java.nio.channels.SeekableByteChannel; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.util.ArrayList; +import java.util.Collections; +import java.util.Iterator; +import java.util.List; +import java.util.Locale; +import java.util.NoSuchElementException; +import org.apache.beam.sdk.io.FileSystems; +import org.apache.beam.sdk.io.fs.EmptyMatchTreatment; +import org.apache.beam.sdk.io.fs.MatchResult; +import org.apache.beam.sdk.io.fs.MoveOptions; +import org.apache.beam.sdk.io.fs.ResourceId; + +/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link FileSystems}. */ +public class BeamFileSystemClient implements FileSystemClient { + @Override + public CloseableIterator<FileStatus> listFrom(String path) throws IOException { + String glob = globForSiblings(path); + List<FileStatus> statuses = new ArrayList<>(); + String normalizedInput = FileSystems.matchNewResource(path, false).toString(); + for (MatchResult.Metadata metadata : + FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) { + if (metadata.resourceId().isDirectory()) { + continue; + } + String metadataPath = metadata.resourceId().toString(); + if (normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(normalizedInput)) >= 0) { + statuses.add(toDeltaFileStatus(metadata)); + } + } + statuses.sort( + (first, second) -> + normalizeForOrdering(first.getPath()) + .compareTo(normalizeForOrdering(second.getPath()))); + return closeableIterator(statuses.iterator()); + } + + @Override + public String resolvePath(String path) throws IOException { + try { + return getFileStatus(path).getPath(); + } catch (IOException e) { + return FileSystems.matchNewResource(path, false).toString(); + } + } + + @Override + public CloseableIterator<ByteArrayInputStream> readFiles( + CloseableIterator<FileReadRequest> readRequests) { + return new CloseableIterator<ByteArrayInputStream>() { + @Override + public boolean hasNext() { + return readRequests.hasNext(); + } + + @Override + public ByteArrayInputStream next() { + FileReadRequest request = readRequests.next(); + try { + return readRange(request.getPath(), request.getStartOffset(), request.getReadLength()); + } catch (IOException e) { + throw new UncheckedIOException( + String.format( + "IOException reading from file %s at offset %s size %s", + request.getPath(), request.getStartOffset(), request.getReadLength()), + e); + } + } + + @Override + public void close() throws IOException { + readRequests.close(); + } + }; + } + + @Override + public boolean mkdirs(String path) throws IOException { + if (isLocalPath(path)) { + Files.createDirectories(toLocalPath(path)); + } + return true; + } + + @Override + public boolean delete(String path) throws IOException { + FileSystems.delete( + Collections.singletonList(FileSystems.matchNewResource(path, false)), + MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES); + return true; + } + + @Override + public FileStatus getFileStatus(String path) throws IOException { + return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path)); + } + + @Override + public void copyFileAtomically(String src, String dst, boolean overwrite) throws IOException { + ResourceId srcResource = FileSystems.matchNewResource(src, false); + ResourceId dstResource = FileSystems.matchNewResource(dst, false); + MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW); + if (!overwrite + && dstMatch.status() == MatchResult.Status.OK + && !dstMatch.metadata().isEmpty()) { + throw new IOException("Destination already exists: " + dst); + } + + if (overwrite) { + FileSystems.copy( + Collections.singletonList(srcResource), Collections.singletonList(dstResource)); + } else { + FileSystems.copy( + Collections.singletonList(srcResource), + Collections.singletonList(dstResource), + MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS); + } + } + + private static ByteArrayInputStream readRange(String path, int startOffset, int readLength) + throws IOException { + ResourceId resourceId = FileSystems.matchNewResource(path, false); + try (ReadableByteChannel channel = FileSystems.open(resourceId)) { + byte[] data = new byte[readLength]; + if (channel instanceof SeekableByteChannel) { + ((SeekableByteChannel) channel).position(startOffset); + readFully(channel, ByteBuffer.wrap(data)); + } else { + try (InputStream stream = Channels.newInputStream(channel)) { + com.google.common.io.ByteStreams.skipFully(stream, startOffset); + com.google.common.io.ByteStreams.readFully(stream, data); + } + } + return new ByteArrayInputStream(data); + } + } + + private static void readFully(ReadableByteChannel channel, ByteBuffer buffer) throws IOException { + while (buffer.hasRemaining()) { + if (channel.read(buffer) < 0) { + throw new EOFException("Unexpected end of file"); + } + } + } + + private static FileStatus toDeltaFileStatus(MatchResult.Metadata metadata) { + return FileStatus.of( + metadata.resourceId().toString(), metadata.sizeBytes(), metadata.lastModifiedMillis()); + } + + private static String globForSiblings(String path) { + String normalized = path.replace('\\', '/'); + int lastSlash = normalized.lastIndexOf('/'); + if (lastSlash < 0) { + return "*"; + } + return path.substring(0, lastSlash + 1) + "*"; + } + + private static String normalizeForOrdering(String path) { + return path.replace('\\', '/'); + } + + private static boolean isLocalPath(String path) { + int schemeSeparator = path.indexOf(':'); + if (schemeSeparator < 0) { + return true; + } + String scheme = path.substring(0, schemeSeparator).toLowerCase(Locale.ROOT); + return scheme.length() == 1 || "file".equals(scheme); + } + + private static Path toLocalPath(String path) { + if (path.toLowerCase(Locale.ROOT).startsWith("file:")) { + return Paths.get(URI.create(path)); + } + return Paths.get(path); + } Review Comment:  Calling `URI.create(path)` directly on a `file:` URI can throw an `IllegalArgumentException` if the path contains unescaped spaces or special characters (which are common in local file paths). Adding a try-catch block with a fallback that strips the `file:` prefix makes the path resolution more robust. ```java private static Path toLocalPath(String path) { if (path.toLowerCase(Locale.ROOT).startsWith("file:")) { try { return Paths.get(new URI(path)); } catch (Exception e) { String schemeStripped = path.substring(5); if (schemeStripped.startsWith("///")) { schemeStripped = schemeStripped.substring(3); } return Paths.get(schemeStripped); } } return Paths.get(path); } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
