RussellSpitzer commented on code in PR #15239:
URL: https://github.com/apache/iceberg/pull/15239#discussion_r2854689938


##########
spark/v4.1/spark/src/main/java/org/apache/iceberg/spark/source/SparkMicroBatchStream.java:
##########
@@ -247,52 +267,147 @@ public void prepareForTriggerAvailableNow() {
     }
   }
 
-  private static class InitialOffsetStore {
+  private abstract static class BaseOffsetStore {
+    private static final Joiner SLASH = Joiner.on("/");
+
     private final Table table;
-    private final FileIO io;
     private final String initialOffsetLocation;
     private final Long fromTimestamp;
 
-    InitialOffsetStore(Table table, String checkpointLocation, Long 
fromTimestamp) {
+    BaseOffsetStore(Table table, String checkpointLocation, Long 
fromTimestamp) {
       this.table = table;
-      this.io = table.io();
       this.initialOffsetLocation = SLASH.join(checkpointLocation, "offsets/0");
       this.fromTimestamp = fromTimestamp;
     }
 
+    protected Table table() {
+      return table;
+    }
+
+    protected String initialOffsetLocation() {
+      return initialOffsetLocation;
+    }
+
+    protected Long fromTimestamp() {
+      return fromTimestamp;
+    }
+
     public StreamingOffset initialOffset() {
-      InputFile inputFile = io.newInputFile(initialOffsetLocation);
-      if (inputFile.exists()) {
-        return readOffset(inputFile);
+      if (offsetExists()) {
+        return readOffset();
       }
 
-      table.refresh();
-      StreamingOffset offset = MicroBatchUtils.determineStartingOffset(table, 
fromTimestamp);
+      table().refresh();
+      StreamingOffset offset = 
MicroBatchUtils.determineStartingOffset(table(), fromTimestamp());
+      writeOffset(offset);
+      return offset;
+    }
 
-      OutputFile outputFile = io.newOutputFile(initialOffsetLocation);
-      writeOffset(offset, outputFile);
+    protected abstract boolean offsetExists();
 
-      return offset;
+    protected abstract StreamingOffset readOffset();
+
+    protected abstract void writeOffset(StreamingOffset offset);
+  }
+
+  private static class TableIOOffsetStore extends BaseOffsetStore {
+    private final FileIO io;
+
+    TableIOOffsetStore(Table table, String checkpointLocation, Long 
fromTimestamp) {
+      super(table, checkpointLocation, fromTimestamp);
+      this.io = table.io();
     }
 
-    private void writeOffset(StreamingOffset offset, OutputFile file) {
+    @Override
+    protected boolean offsetExists() {
+      InputFile inputFile = io.newInputFile(initialOffsetLocation());
+      return inputFile.exists();
+    }
+
+    @Override
+    protected StreamingOffset readOffset() {
+      InputFile file = io.newInputFile(initialOffsetLocation());
+      try (InputStream in = file.newStream()) {
+        return StreamingOffset.fromJson(in);
+      } catch (IOException ioException) {
+        throw new UncheckedIOException(
+            String.format("Failed reading offset from: %s", 
initialOffsetLocation()), ioException);
+      }
+    }
+
+    @Override
+    protected void writeOffset(StreamingOffset offset) {
+      OutputFile file = io.newOutputFile(initialOffsetLocation());
       try (OutputStream outputStream = file.create()) {
         BufferedWriter writer =
             new BufferedWriter(new OutputStreamWriter(outputStream, 
StandardCharsets.UTF_8));
         writer.write(offset.json());
         writer.flush();
       } catch (IOException ioException) {
         throw new UncheckedIOException(
-            String.format("Failed writing offset to: %s", 
initialOffsetLocation), ioException);
+            String.format("Failed writing offset to: %s", 
initialOffsetLocation()), ioException);
       }
     }
+  }
 
-    private StreamingOffset readOffset(InputFile file) {
-      try (InputStream in = file.newStream()) {
-        return StreamingOffset.fromJson(in);
+  private static class HadoopOffsetStore extends BaseOffsetStore {
+    private final FileSystem fileSystem;
+    private final Path initialOffsetPath;
+
+    HadoopOffsetStore(
+        Table table, String checkpointLocation, Long fromTimestamp, 
Configuration hadoopConf) {
+      super(table, checkpointLocation, fromTimestamp);
+      this.initialOffsetPath = new Path(initialOffsetLocation());
+
+      try {
+        this.fileSystem = initialOffsetPath.getFileSystem(hadoopConf);
+      } catch (IOException ioException) {
+        throw new UncheckedIOException(
+            String.format("Failed to get FileSystem for: %s", 
initialOffsetPath), ioException);
+      }
+    }
+
+    @Override
+    protected boolean offsetExists() {
+      try {
+        return fileSystem.exists(initialOffsetPath);
+      } catch (IOException ioException) {
+        throw new UncheckedIOException(
+            String.format("Failed to check existence of: %s", 
initialOffsetPath), ioException);
+      }
+    }
+
+    @Override
+    protected StreamingOffset readOffset() {
+      try (FSDataInputStream inputStream = fileSystem.open(initialOffsetPath);
+          InputStreamReader reader = new InputStreamReader(inputStream, 
StandardCharsets.UTF_8)) {
+        String json = CharStreams.toString(reader);

Review Comment:
   Don't we already have a StreamingOffset.fromJson(InputStream in) ? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to