gemini-code-assist[bot] commented on code in PR #38873:
URL: https://github.com/apache/beam/pull/38873#discussion_r3396122898


##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.delta;
+
+import io.delta.kernel.engine.FileReadRequest;
+import io.delta.kernel.engine.FileSystemClient;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+
+/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link 
FileSystems}. */
+public class BeamFileSystemClient implements FileSystemClient {
+  @Override
+  public CloseableIterator<FileStatus> listFrom(String path) throws 
IOException {
+    String glob = globForSiblings(path);
+    List<FileStatus> statuses = new ArrayList<>();
+    String normalizedInput = FileSystems.matchNewResource(path, 
false).toString();
+    for (MatchResult.Metadata metadata :
+      FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) {
+      if (metadata.resourceId().isDirectory()) {
+        continue;
+      }
+      String metadataPath = metadata.resourceId().toString();
+      if 
(normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(normalizedInput))
 >= 0) {
+        statuses.add(toDeltaFileStatus(metadata));
+      }
+    }
+    statuses.sort(
+        (first, second) ->
+            normalizeForOrdering(first.getPath())
+                .compareTo(normalizeForOrdering(second.getPath())));
+    return closeableIterator(statuses.iterator());
+  }
+
+  @Override
+  public String resolvePath(String path) throws IOException {
+    try {
+      return getFileStatus(path).getPath();
+    } catch (IOException e) {
+      return FileSystems.matchNewResource(path, false).toString();
+    }
+  }
+
+  @Override
+  public CloseableIterator<ByteArrayInputStream> readFiles(
+      CloseableIterator<FileReadRequest> readRequests) {
+    return new CloseableIterator<ByteArrayInputStream>() {
+      @Override
+      public boolean hasNext() {
+        return readRequests.hasNext();
+      }
+
+      @Override
+      public ByteArrayInputStream next() {
+        FileReadRequest request = readRequests.next();
+        try {
+          return readRange(request.getPath(), request.getStartOffset(), 
request.getReadLength());
+        } catch (IOException e) {
+          throw new UncheckedIOException(
+              String.format(
+                  "IOException reading from file %s at offset %s size %s",
+                  request.getPath(), request.getStartOffset(), 
request.getReadLength()),
+              e);
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+        readRequests.close();
+      }
+    };
+  }
+
+  @Override
+  public boolean mkdirs(String path) throws IOException {
+    if (isLocalPath(path)) {
+      Files.createDirectories(toLocalPath(path));
+    }
+    return true;
+  }
+
+  @Override
+  public boolean delete(String path) throws IOException {
+    FileSystems.delete(
+        Collections.singletonList(FileSystems.matchNewResource(path, false)),
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+    return true;
+  }
+
+  @Override
+  public FileStatus getFileStatus(String path) throws IOException {
+    return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path));
+  }
+
+  @Override
+  public void copyFileAtomically(String src, String dst, boolean overwrite) 
throws IOException {
+    ResourceId srcResource = FileSystems.matchNewResource(src, false);
+    ResourceId dstResource = FileSystems.matchNewResource(dst, false);
+    MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW);
+    if (!overwrite
+        && dstMatch.status() == MatchResult.Status.OK
+        && !dstMatch.metadata().isEmpty()) {
+      throw new IOException("Destination already exists: " + dst);
+    }
+
+    if (overwrite) {
+      FileSystems.copy(
+          Collections.singletonList(srcResource), 
Collections.singletonList(dstResource));
+    } else {
+      FileSystems.copy(
+          Collections.singletonList(srcResource),
+          Collections.singletonList(dstResource),
+          MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+    }
+  }

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The three-argument overload of `FileSystems.copy` does not exist in Apache 
Beam. Passing `MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS` as a 
third argument will cause a compilation error. Since you already perform a 
pre-check to throw an exception if the destination exists and `overwrite` is 
false, you can safely use the standard two-argument `FileSystems.copy` method 
for both cases.
   
   ```suggestion
       FileSystems.copy(
           Collections.singletonList(srcResource), 
Collections.singletonList(dstResource));
   ```



##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.delta;
+
+import io.delta.kernel.engine.FileReadRequest;
+import io.delta.kernel.engine.FileSystemClient;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+
+/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link 
FileSystems}. */
+public class BeamFileSystemClient implements FileSystemClient {
+  @Override
+  public CloseableIterator<FileStatus> listFrom(String path) throws 
IOException {
+    String glob = globForSiblings(path);
+    List<FileStatus> statuses = new ArrayList<>();
+    String normalizedInput = FileSystems.matchNewResource(path, 
false).toString();
+    for (MatchResult.Metadata metadata :
+      FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) {
+      if (metadata.resourceId().isDirectory()) {
+        continue;
+      }
+      String metadataPath = metadata.resourceId().toString();
+      if 
(normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(normalizedInput))
 >= 0) {
+        statuses.add(toDeltaFileStatus(metadata));
+      }
+    }
+    statuses.sort(
+        (first, second) ->
+            normalizeForOrdering(first.getPath())
+                .compareTo(normalizeForOrdering(second.getPath())));
+    return closeableIterator(statuses.iterator());
+  }
+
+  @Override
+  public String resolvePath(String path) throws IOException {
+    try {
+      return getFileStatus(path).getPath();
+    } catch (IOException e) {
+      return FileSystems.matchNewResource(path, false).toString();
+    }
+  }
+
+  @Override
+  public CloseableIterator<ByteArrayInputStream> readFiles(
+      CloseableIterator<FileReadRequest> readRequests) {
+    return new CloseableIterator<ByteArrayInputStream>() {
+      @Override
+      public boolean hasNext() {
+        return readRequests.hasNext();
+      }
+
+      @Override
+      public ByteArrayInputStream next() {
+        FileReadRequest request = readRequests.next();
+        try {
+          return readRange(request.getPath(), request.getStartOffset(), 
request.getReadLength());
+        } catch (IOException e) {
+          throw new UncheckedIOException(
+              String.format(
+                  "IOException reading from file %s at offset %s size %s",
+                  request.getPath(), request.getStartOffset(), 
request.getReadLength()),
+              e);
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+        readRequests.close();
+      }
+    };
+  }
+
+  @Override
+  public boolean mkdirs(String path) throws IOException {
+    if (isLocalPath(path)) {
+      Files.createDirectories(toLocalPath(path));
+    }
+    return true;
+  }
+
+  @Override
+  public boolean delete(String path) throws IOException {
+    FileSystems.delete(
+        Collections.singletonList(FileSystems.matchNewResource(path, false)),
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+    return true;
+  }
+
+  @Override
+  public FileStatus getFileStatus(String path) throws IOException {
+    return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path));
+  }
+
+  @Override
+  public void copyFileAtomically(String src, String dst, boolean overwrite) 
throws IOException {
+    ResourceId srcResource = FileSystems.matchNewResource(src, false);
+    ResourceId dstResource = FileSystems.matchNewResource(dst, false);
+    MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW);
+    if (!overwrite
+        && dstMatch.status() == MatchResult.Status.OK
+        && !dstMatch.metadata().isEmpty()) {
+      throw new IOException("Destination already exists: " + dst);
+    }
+
+    if (overwrite) {
+      FileSystems.copy(
+          Collections.singletonList(srcResource), 
Collections.singletonList(dstResource));
+    } else {
+      FileSystems.copy(
+          Collections.singletonList(srcResource),
+          Collections.singletonList(dstResource),
+          MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+    }
+  }
+
+  private static ByteArrayInputStream readRange(String path, int startOffset, 
int readLength)
+    throws IOException {
+    ResourceId resourceId = FileSystems.matchNewResource(path, false);
+    try (ReadableByteChannel channel = FileSystems.open(resourceId)) {
+      byte[] data = new byte[readLength];
+      if (channel instanceof SeekableByteChannel) {
+        ((SeekableByteChannel) channel).position(startOffset);
+        readFully(channel, ByteBuffer.wrap(data));
+      } else {
+        try (InputStream stream = Channels.newInputStream(channel)) {
+          com.google.common.io.ByteStreams.skipFully(stream, startOffset);
+          com.google.common.io.ByteStreams.readFully(stream, data);
+        }
+      }
+      return new ByteArrayInputStream(data);
+    }
+  }
+
+  private static void readFully(ReadableByteChannel channel, ByteBuffer 
buffer) throws IOException {
+    while (buffer.hasRemaining()) {
+      if (channel.read(buffer) < 0) {
+        throw new EOFException("Unexpected end of file");
+      }
+    }
+  }
+
+  private static FileStatus toDeltaFileStatus(MatchResult.Metadata metadata) {
+    return FileStatus.of(
+        metadata.resourceId().toString(), metadata.sizeBytes(), 
metadata.lastModifiedMillis());
+  }
+
+  private static String globForSiblings(String path) {
+    String normalized = path.replace('\\', '/');
+    int lastSlash = normalized.lastIndexOf('/');
+    if (lastSlash < 0) {
+      return "*";
+    }
+    return path.substring(0, lastSlash + 1) + "*";
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Using `path.substring` in `globForSiblings` can preserve backslashes on 
Windows platforms, which may cause issues with glob pattern matching in some 
filesystems. It is safer to use the `normalized` string (which has forward 
slashes) to construct the glob pattern.
   
   ```suggestion
     private static String globForSiblings(String path) {
       String normalized = path.replace('\\', '/');
       int lastSlash = normalized.lastIndexOf('/');
       if (lastSlash < 0) {
         return "*";
       }
       return normalized.substring(0, lastSlash + 1) + "*";
     }
   ```



##########
sdks/java/io/delta/src/main/java/org/apache/beam/sdk/io/delta/BeamFileSystemClient.java:
##########
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.delta;
+
+import io.delta.kernel.engine.FileReadRequest;
+import io.delta.kernel.engine.FileSystemClient;
+import io.delta.kernel.utils.CloseableIterator;
+import io.delta.kernel.utils.FileStatus;
+import java.io.ByteArrayInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.UncheckedIOException;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.channels.SeekableByteChannel;
+import java.nio.file.Files;
+import java.nio.file.Path;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Locale;
+import java.util.NoSuchElementException;
+import org.apache.beam.sdk.io.FileSystems;
+import org.apache.beam.sdk.io.fs.EmptyMatchTreatment;
+import org.apache.beam.sdk.io.fs.MatchResult;
+import org.apache.beam.sdk.io.fs.MoveOptions;
+import org.apache.beam.sdk.io.fs.ResourceId;
+
+/** A Delta Kernel {@link FileSystemClient} backed by Beam's {@link 
FileSystems}. */
+public class BeamFileSystemClient implements FileSystemClient {
+  @Override
+  public CloseableIterator<FileStatus> listFrom(String path) throws 
IOException {
+    String glob = globForSiblings(path);
+    List<FileStatus> statuses = new ArrayList<>();
+    String normalizedInput = FileSystems.matchNewResource(path, 
false).toString();
+    for (MatchResult.Metadata metadata :
+      FileSystems.match(glob, EmptyMatchTreatment.ALLOW).metadata()) {
+      if (metadata.resourceId().isDirectory()) {
+        continue;
+      }
+      String metadataPath = metadata.resourceId().toString();
+      if 
(normalizeForOrdering(metadataPath).compareTo(normalizeForOrdering(normalizedInput))
 >= 0) {
+        statuses.add(toDeltaFileStatus(metadata));
+      }
+    }
+    statuses.sort(
+        (first, second) ->
+            normalizeForOrdering(first.getPath())
+                .compareTo(normalizeForOrdering(second.getPath())));
+    return closeableIterator(statuses.iterator());
+  }
+
+  @Override
+  public String resolvePath(String path) throws IOException {
+    try {
+      return getFileStatus(path).getPath();
+    } catch (IOException e) {
+      return FileSystems.matchNewResource(path, false).toString();
+    }
+  }
+
+  @Override
+  public CloseableIterator<ByteArrayInputStream> readFiles(
+      CloseableIterator<FileReadRequest> readRequests) {
+    return new CloseableIterator<ByteArrayInputStream>() {
+      @Override
+      public boolean hasNext() {
+        return readRequests.hasNext();
+      }
+
+      @Override
+      public ByteArrayInputStream next() {
+        FileReadRequest request = readRequests.next();
+        try {
+          return readRange(request.getPath(), request.getStartOffset(), 
request.getReadLength());
+        } catch (IOException e) {
+          throw new UncheckedIOException(
+              String.format(
+                  "IOException reading from file %s at offset %s size %s",
+                  request.getPath(), request.getStartOffset(), 
request.getReadLength()),
+              e);
+        }
+      }
+
+      @Override
+      public void close() throws IOException {
+        readRequests.close();
+      }
+    };
+  }
+
+  @Override
+  public boolean mkdirs(String path) throws IOException {
+    if (isLocalPath(path)) {
+      Files.createDirectories(toLocalPath(path));
+    }
+    return true;
+  }
+
+  @Override
+  public boolean delete(String path) throws IOException {
+    FileSystems.delete(
+        Collections.singletonList(FileSystems.matchNewResource(path, false)),
+        MoveOptions.StandardMoveOptions.IGNORE_MISSING_FILES);
+    return true;
+  }
+
+  @Override
+  public FileStatus getFileStatus(String path) throws IOException {
+    return toDeltaFileStatus(FileSystems.matchSingleFileSpec(path));
+  }
+
+  @Override
+  public void copyFileAtomically(String src, String dst, boolean overwrite) 
throws IOException {
+    ResourceId srcResource = FileSystems.matchNewResource(src, false);
+    ResourceId dstResource = FileSystems.matchNewResource(dst, false);
+    MatchResult dstMatch = FileSystems.match(dst, EmptyMatchTreatment.ALLOW);
+    if (!overwrite
+        && dstMatch.status() == MatchResult.Status.OK
+        && !dstMatch.metadata().isEmpty()) {
+      throw new IOException("Destination already exists: " + dst);
+    }
+
+    if (overwrite) {
+      FileSystems.copy(
+          Collections.singletonList(srcResource), 
Collections.singletonList(dstResource));
+    } else {
+      FileSystems.copy(
+          Collections.singletonList(srcResource),
+          Collections.singletonList(dstResource),
+          MoveOptions.StandardMoveOptions.SKIP_IF_DESTINATION_EXISTS);
+    }
+  }
+
+  private static ByteArrayInputStream readRange(String path, int startOffset, 
int readLength)
+    throws IOException {
+    ResourceId resourceId = FileSystems.matchNewResource(path, false);
+    try (ReadableByteChannel channel = FileSystems.open(resourceId)) {
+      byte[] data = new byte[readLength];
+      if (channel instanceof SeekableByteChannel) {
+        ((SeekableByteChannel) channel).position(startOffset);
+        readFully(channel, ByteBuffer.wrap(data));
+      } else {
+        try (InputStream stream = Channels.newInputStream(channel)) {
+          com.google.common.io.ByteStreams.skipFully(stream, startOffset);
+          com.google.common.io.ByteStreams.readFully(stream, data);
+        }
+      }
+      return new ByteArrayInputStream(data);
+    }
+  }
+
+  private static void readFully(ReadableByteChannel channel, ByteBuffer 
buffer) throws IOException {
+    while (buffer.hasRemaining()) {
+      if (channel.read(buffer) < 0) {
+        throw new EOFException("Unexpected end of file");
+      }
+    }
+  }
+
+  private static FileStatus toDeltaFileStatus(MatchResult.Metadata metadata) {
+    return FileStatus.of(
+        metadata.resourceId().toString(), metadata.sizeBytes(), 
metadata.lastModifiedMillis());
+  }
+
+  private static String globForSiblings(String path) {
+    String normalized = path.replace('\\', '/');
+    int lastSlash = normalized.lastIndexOf('/');
+    if (lastSlash < 0) {
+      return "*";
+    }
+    return path.substring(0, lastSlash + 1) + "*";
+  }
+
+  private static String normalizeForOrdering(String path) {
+    return path.replace('\\', '/');
+  }
+
+  private static boolean isLocalPath(String path) {
+    int schemeSeparator = path.indexOf(':');
+    if (schemeSeparator < 0) {
+      return true;
+    }
+    String scheme = path.substring(0, 
schemeSeparator).toLowerCase(Locale.ROOT);
+    return scheme.length() == 1 || "file".equals(scheme);
+  }
+
+  private static Path toLocalPath(String path) {
+    if (path.toLowerCase(Locale.ROOT).startsWith("file:")) {
+      return Paths.get(URI.create(path));
+    }
+    return Paths.get(path);
+  }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   Calling `URI.create(path)` directly on a `file:` URI can throw an 
`IllegalArgumentException` if the path contains unescaped spaces or special 
characters (which are common in local file paths). Adding a try-catch block 
with a fallback that strips the `file:` prefix makes the path resolution more 
robust.
   
   ```java
     private static Path toLocalPath(String path) {
       if (path.toLowerCase(Locale.ROOT).startsWith("file:")) {
         try {
           return Paths.get(new URI(path));
         } catch (Exception e) {
           String schemeStripped = path.substring(5);
           if (schemeStripped.startsWith("///")) {
             schemeStripped = schemeStripped.substring(3);
           }
           return Paths.get(schemeStripped);
         }
       }
       return Paths.get(path);
     }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to