This is an automated email from the ASF dual-hosted git repository.

hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git


The following commit(s) were added to refs/heads/main by this push:
     new 837a491117 Fix JSONinput doesnt stop processing empty files after 
detecting that they are empty (#7104)
837a491117 is described below

commit 837a4911171797c904898bd26122b5b58530d295
Author: Lance <[email protected]>
AuthorDate: Mon May 18 21:09:46 2026 +0800

    Fix JSONinput doesnt stop processing empty files after detecting that they 
are empty (#7104)
    
    rebase fix
    
    Signed-off-by: lance <[email protected]>
---
 .../pipeline/transforms/jsoninput/JsonInput.java   | 158 +++---
 .../transforms/jsoninput/JsonInputData.java        |  13 +-
 .../jsoninput/reader/FastJsonReader.java           |   5 +
 .../transforms/jsoninput/reader/IJsonReader.java   |   6 +
 .../transforms/jsoninput/reader/InputsReader.java  |   8 +-
 .../transforms/jsoninput/JsonInputDataTests.java   |  59 +++
 .../transforms/jsoninput/JsonInputTest.java        | 208 +++++---
 .../jsoninput/reader/FastJsonReaderTest.java       |  28 +-
 .../jsoninput/reader/InputsReaderTests.java        | 586 +++++++++++++++++++++
 9 files changed, 907 insertions(+), 164 deletions(-)

diff --git 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
index 3073c1c159..59ecb79ec1 100644
--- 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
+++ 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
@@ -20,7 +20,9 @@ package org.apache.hop.pipeline.transforms.jsoninput;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
+import java.io.PushbackInputStream;
 import java.util.ArrayList;
 import java.util.BitSet;
 import java.util.List;
@@ -111,6 +113,24 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
 
   @Override
   public boolean processRow() throws HopException {
+    beginProcessRowIteration();
+    try {
+      if (!writeOutputRowIfPossible(getOneOutputRow())) {
+        return false;
+      }
+    } catch (JsonInputException e) {
+      if (!handleJsonInputExceptionInProcessRow()) {
+        return false;
+      }
+    } catch (Exception e) {
+      if (!handleUnexpectedExceptionInProcessRow(e)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private void beginProcessRowIteration() throws HopException {
     if (first) {
       first = false;
       prepareToRowProcessing(false);
@@ -121,58 +141,57 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
             BaseMessages.getString(PKG, "JsonInput.Log.ReceivingMultiRows"));
       }
     }
+  }
 
-    Object[] outRow;
-    try {
-      // Grab a row
-      outRow = getOneOutputRow();
-      if (outRow == null) {
-        setOutputDone(); // signal end to receiver(s)
-        return false; // end of data or error.
-      }
-
-      if (isRowLevel()) {
-        logRowlevel(
-            BaseMessages.getString(
-                PKG, "JsonInput.Log.ReadRow", 
data.outputRowMeta.getString(outRow)));
-      }
-      incrementLinesInput();
-      data.rownr++;
-
-      putRow(data.outputRowMeta, outRow); // copy row to output rowset(s)
-
-      if (meta.getRowLimit() > 0 && data.rownr > meta.getRowLimit()) {
-        // limit has been reached: stop now.
-        setOutputDone();
-        return false;
-      }
+  private boolean writeOutputRowIfPossible(Object[] outRow) throws 
HopException {
+    if (outRow == null) {
+      setOutputDone();
+      return false;
+    }
+    if (isRowLevel()) {
+      logRowlevel(
+          BaseMessages.getString(
+              PKG, "JsonInput.Log.ReadRow", 
data.outputRowMeta.getString(outRow)));
+    }
+    incrementLinesInput();
+    data.rownr++;
+    putRow(data.outputRowMeta, outRow);
+    if (meta.getRowLimit() > 0 && data.rownr > meta.getRowLimit()) {
+      setOutputDone();
+      return false;
+    }
+    return true;
+  }
 
-    } catch (JsonInputException e) {
-      if (!getTransformMeta().isDoingErrorHandling()) {
-        stopErrorExecution(e);
-        return false;
-      }
-    } catch (Exception e) {
-      logError(BaseMessages.getString(PKG, 
"JsonInput.ErrorInTransformRunning", e.getMessage()));
-      if (getTransformMeta().isDoingErrorHandling()) {
-        sendErrorRow(e.toString());
-      } else {
-        incrementErrors();
-        stopErrorExecution(e);
-        return false;
-      }
+  private boolean handleJsonInputExceptionInProcessRow() {
+    if (!getTransformMeta().isDoingErrorHandling()) {
+      stopErrorExecution();
+      return false;
     }
     return true;
   }
 
-  private void stopErrorExecution(Exception e) {
+  /**
+   * @return false if execution must stop
+   */
+  private boolean handleUnexpectedExceptionInProcessRow(Exception e) {
+    logError(BaseMessages.getString(PKG, "JsonInput.ErrorInTransformRunning", 
e.getMessage()));
+    if (getTransformMeta().isDoingErrorHandling()) {
+      sendErrorRow(e.toString());
+      return true;
+    }
+    incrementErrors();
+    stopErrorExecution();
+    return false;
+  }
+
+  private void stopErrorExecution() {
     stopAll();
     setOutputDone();
   }
 
   @Override
-  protected void prepareToRowProcessing(boolean errorIgnored)
-      throws HopException, HopTransformException, HopValueException {
+  protected void prepareToRowProcessing(boolean errorIgnored) throws 
HopException {
     data.readrow = getRow();
     data.inputRowMeta = getInputRowMeta();
     if (!meta.isInFields() && data.inputRowMeta == null) {
@@ -264,18 +283,20 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
       inputError(errMsg);
       return false;
     }
+
+    if (file.getContent().getSize() == 0 && meta.isIgnoringEmptyFile()) {
+      logBasic(BaseMessages.getString(PKG, "JsonInput.Error.FileSizeZero", "" 
+ file.getName()));
+      // Skip opening/parsing this file; InputsReader will advance to the next 
file.
+      data.skipEmptyFile = true;
+      return false;
+    }
     if (hasAdditionalFileFields()) {
       fillFileAdditionalFields(data, file);
     }
     if (file.getContent().getSize() == 0) {
-      // log only basic as a warning (was before logError)
-      if (meta.isIgnoringEmptyFile()) {
-        logBasic(BaseMessages.getString(PKG, "JsonInput.Error.FileSizeZero", 
"" + file.getName()));
-      } else {
-        logError(BaseMessages.getString(PKG, "JsonInput.Error.FileSizeZero", 
"" + file.getName()));
-        incrementErrors();
-        return false;
-      }
+      logError(BaseMessages.getString(PKG, "JsonInput.Error.FileSizeZero", "" 
+ file.getName()));
+      incrementErrors();
+      return false;
     }
     return true;
   }
@@ -293,22 +314,30 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
   }
 
   private void parseNextInputToRowSet(InputStream input) throws HopException {
-    try {
-      data.readerRowSet = data.reader.parseStringValue(input);
+    // Large pushback buffer: some streams pre-read; avoids buffer overflow on 
unread(1).
+    try (PushbackInputStream pb = new PushbackInputStream(input, 8192)) {
+      if (meta.isIgnoringEmptyFile()) {
+        int first = pb.read();
+        if (first < 0) {
+          data.readerRowSet = data.jsonReader.emptyFieldRowSet();
+          return;
+        }
+        pb.unread(first);
+        input = pb;
+      }
+      data.readerRowSet = data.jsonReader.parseStringValue(input);
     } catch (HopException ke) {
       logInputError(ke);
       throw new JsonInputException(ke);
     } catch (Exception e) {
       logInputError(e);
       throw new JsonInputException(e);
-    } finally {
-      closeQuietly(input);
     }
   }
 
   private void parseNextJsonToRowSet(JsonNode node) throws HopException {
     try {
-      data.readerRowSet = data.reader.parseJsonNodeValue(node);
+      data.readerRowSet = data.jsonReader.parseJsonNodeValue(node);
     } catch (Exception e) {
       logInputError(e);
       throw new JsonInputException(e);
@@ -395,7 +424,7 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
   }
 
   /** get final row for output */
-  private Object[] getOneOutputRow() throws HopException {
+  private Object[] getOneOutputRow() throws HopException, IOException {
     if (meta.isInFields() && !data.hasFirstRow) {
       return null;
     }
@@ -427,7 +456,7 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
           InputStream nextIn = data.inputs.next();
           if (nextIn != null) {
             CountingInputStream countingIn = new CountingInputStream(nextIn);
-            try {
+            try (countingIn) {
               parseNextInputToRowSet(countingIn);
             } finally {
               long bytesRead = countingIn.getCount();
@@ -450,7 +479,13 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
               BaseTransform.closeQuietly(countingIn);
             }
           } else {
-            parseNextInputToRowSet(new ByteArrayInputStream(EMPTY_JSON));
+            // Null stream (e.g. null field, iterator error path): do not 
parse "{}" when ignoring
+            // empty input — that would still run JSONPath and fail if paths 
are missing.
+            if (meta.isIgnoringEmptyFile()) {
+              data.readerRowSet = data.jsonReader.emptyFieldRowSet();
+            } else {
+              parseNextInputToRowSet(new ByteArrayInputStream(EMPTY_JSON));
+            }
           }
         } else {
           if (isDetailed()) {
@@ -551,7 +586,7 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
     }
     // Add RootUri
     if (!Utils.isEmpty(meta.getRootUriField())) {
-      outputRowData[rowIndex++] = data.rootUriName;
+      outputRowData[rowIndex] = data.rootUriName;
     }
   }
 
@@ -565,8 +600,9 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
       inputFields[i] = field;
     }
     // Instead of putting in the meta.inputFields, we put in our json path 
resolved input fields
-    data.reader = new FastJsonReader(inputFields, 
meta.isDefaultPathLeafToNull(), getLogChannel());
-    data.reader.setIgnoreMissingPath(meta.isIgnoringMissingPath());
+    data.jsonReader =
+        new FastJsonReader(inputFields, meta.isDefaultPathLeafToNull(), 
getLogChannel());
+    data.jsonReader.setIgnoreMissingPath(meta.isIgnoringMissingPath());
   }
 
   @Override
@@ -575,7 +611,7 @@ public class JsonInput extends 
BaseFileInputTransform<JsonInputMeta, JsonInputDa
       closeQuietly(data.file);
     }
     data.inputs = null;
-    data.reader = null;
+    data.jsonReader = null;
     data.readerRowSet = null;
     data.repeatedFields = null;
     super.dispose();
diff --git 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
index a7abee5571..77c82ef72f 100644
--- 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
+++ 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
@@ -49,11 +49,20 @@ public class JsonInputData extends 
BaseFileInputTransformData implements ITransf
   public int indexSourceField;
 
   public Iterator<InputStream> inputs;
-  public Iterator<JsonNode> jsonInputs; // if incoming field is JsonNode
-  public IJsonReader reader;
+
+  /** if incoming field is JsonNode */
+  public Iterator<JsonNode> jsonInputs;
+
+  public IJsonReader jsonReader;
   public IRowSet readerRowSet;
   public BitSet repeatedFields;
 
+  /**
+   * When {@link JsonInput#onNewFile} rejects an empty file with "ignore empty 
file" enabled, the
+   * file iterator loops to the next file instead of substituting placeholder 
JSON.
+   */
+  public boolean skipEmptyFile;
+
   public JsonInputData() {
     super();
     nr_repeats = 0;
diff --git 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
index 294dd9e4bc..1f91a5d506 100644
--- 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
+++ 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
@@ -183,6 +183,11 @@ public class FastJsonReader implements IJsonReader {
     this.paths = compilePaths(fields);
   }
 
+  @Override
+  public IRowSet emptyFieldRowSet() {
+    return getEmptyResponse();
+  }
+
   @Override
   public IRowSet parseStringValue(InputStream in) throws HopException {
     readInput(in);
diff --git 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
index 11836521fa..cd9f1a9e68 100644
--- 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
+++ 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
@@ -41,4 +41,10 @@ public interface IJsonReader {
 
   /** parse incoming JsonNode fields into a rowset */
   public IRowSet parseJsonNodeValue(com.fasterxml.jackson.databind.JsonNode 
in) throws HopException;
+
+  /**
+   * One logical output row with a null for each configured field, without 
reading a JSON document
+   * or evaluating JSON paths (used when input is an empty stream and empty 
files are ignored).
+   */
+  IRowSet emptyFieldRowSet();
 }
diff --git 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
index e5202e3bd7..a5749b6033 100644
--- 
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
+++ 
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
@@ -147,7 +147,7 @@ public class InputsReader implements Iterable<InputStream> {
 
     @Override
     public InputStream tryNext() {
-      if (hasNext()) {
+      while (hasNext()) {
         if (data.file != null) {
           try {
             data.file.close();
@@ -161,8 +161,14 @@ public class InputsReader implements Iterable<InputStream> 
{
           if (transform.onNewFile(data.file)) {
             return HopVfs.getInputStream(data.file);
           }
+          if (data instanceof JsonInputData jsonData && 
jsonData.skipEmptyFile) {
+            jsonData.skipEmptyFile = false;
+            continue;
+          }
+          return null;
         } catch (FileSystemException e) {
           handler.fileOpenError(data.file, e);
+          return null;
         }
       }
       return null;
diff --git 
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputDataTests.java
 
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputDataTests.java
new file mode 100644
index 0000000000..844b4fbcad
--- /dev/null
+++ 
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputDataTests.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.jsoninput;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link JsonInputData} */
+class JsonInputDataTests {
+
+  @Test
+  void constructorInitializesTransformDataDefaults() {
+    JsonInputData data = new JsonInputData();
+
+    assertEquals(0, data.nr_repeats);
+    assertNull(data.previousRow);
+    assertEquals(0, data.filenr);
+    assertEquals(-1, data.indexSourceField);
+    assertEquals(-1, data.nrInputFields);
+    assertNull(data.readrow);
+    assertEquals(0, data.totalpreviousfields);
+    assertFalse(data.hasFirstRow);
+    assertEquals(0L, data.rownr);
+  }
+
+  @Test
+  void skipEmptyFileDefaultsToFalse() {
+    JsonInputData data = new JsonInputData();
+    assertFalse(data.skipEmptyFile);
+  }
+
+  @Test
+  void skipEmptyFileIsMutableFlagForFileIterator() {
+    JsonInputData data = new JsonInputData();
+    data.skipEmptyFile = true;
+    assertTrue(data.skipEmptyFile);
+    data.skipEmptyFile = false;
+    assertFalse(data.skipEmptyFile);
+  }
+}
diff --git 
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
 
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
index 4bc86e2a47..32767af2c1 100644
--- 
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
+++ 
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
@@ -41,7 +41,6 @@ import java.util.zip.ZipOutputStream;
 import org.apache.commons.io.IOUtils;
 import org.apache.commons.vfs2.FileObject;
 import org.apache.commons.vfs2.FileSystemException;
-import org.apache.commons.vfs2.util.FileObjectUtils;
 import org.apache.hop.core.HopClientEnvironment;
 import org.apache.hop.core.IRowSet;
 import org.apache.hop.core.exception.HopException;
@@ -78,71 +77,73 @@ class JsonInputTest {
 
   protected static String getBasicTestJson() {
     return """
-            {
-              "home": {},
-              "store": {
-                "book": [
-                  {
-                    "category": "reference",
-                    "author": "Nigel Rees",
-                    "title": "Sayings of the Century",
-                    "price": 8.95
-                  },
-                  {
-                    "category": "fiction",
-                    "author": "Evelyn Waugh",
-                    "title": "Sword of Honour",
-                    "price": 12.99
-                  },
-                  {
-                    "category": "fiction",
-                    "author": "Herman Melville",
-                    "title": "Moby Dick",
-                    "isbn": "0-553-21311-3",
-                    "price": 8.99
-                  },
-                  {
-                    "category": "fiction",
-                    "author": "J. R. R. Tolkien",
-                    "title": "The Lord of the Rings",
-                    "isbn": "0-395-19395-8",
-                    "price": 22.99
-                  }
-                ],
-                "bicycle": {
-                  "color": "red",
-                  "price": 19.95
-                }
-              }
-            }""";
+                               {
+                                 "home": {},
+                                 "store": {
+                                   "book": [
+                                     {
+                                       "category": "reference",
+                                       "author": "Nigel Rees",
+                                       "title": "Sayings of the Century",
+                                       "price": 8.95
+                                     },
+                                     {
+                                       "category": "fiction",
+                                       "author": "Evelyn Waugh",
+                                       "title": "Sword of Honour",
+                                       "price": 12.99
+                                     },
+                                     {
+                                       "category": "fiction",
+                                       "author": "Herman Melville",
+                                       "title": "Moby Dick",
+                                       "isbn": "0-553-21311-3",
+                                       "price": 8.99
+                                     },
+                                     {
+                                       "category": "fiction",
+                                       "author": "J. R. R. Tolkien",
+                                       "title": "The Lord of the Rings",
+                                       "isbn": "0-395-19395-8",
+                                       "price": 22.99
+                                     }
+                                   ],
+                                   "bicycle": {
+                                     "color": "red",
+                                     "price": 19.95
+                                   }
+                                 }
+                               }""";
   }
 
   private static String getPDI17060Json() {
-    return "{"
-        + " \"path\": 
\"/board/offer-sources/phases/current/cards/acquisitions\","
-        + " \"id\": \"acquisitions\","
-        + " \"template\": \"offer-sources\","
-        + " \"creator\": \"admin\","
-        + " \"created\": 1491703768197,"
-        + " \"modifiedby\": null,"
-        + " \"modified\": null,"
-        + " \"color\": \"blue\","
-        + " \"fields\": {"
-        + "   \"group-detail\": \"Offer Source Details\","
-        + "   \"name\": \"Acquisitions\""
-        + " },"
-        + " \"tasks\": 0,"
-        + " \"history\": 1,"
-        + " \"attachments\": 0,"
-        + " \"comments\": 0,"
-        + " \"alerts\": 0,"
-        + " \"title\": \"Acquisitions\","
-        + " \"lock\": null,"
-        + " \"completeTasks\": null,"
-        + " \"phase\": \"current\","
-        + " \"errors\": null,"
-        + " \"board\": \"offer-sources\""
-        + "}";
+    return """
+        {
+          "path": "/board/offer-sources/phases/current/cards/acquisitions",
+          "id": "acquisitions",
+          "template": "offer-sources",
+          "creator": "admin",
+          "created": 1491703768197,
+          "modifiedby": null,
+          "modified": null,
+          "color": "blue",
+          "fields": {
+            "group-detail": "Offer Source Details",
+            "name": "Acquisitions"
+          },
+          "tasks": 0,
+          "history": 1,
+          "attachments": 0,
+          "comments": 0,
+          "alerts": 0,
+          "title": "Acquisitions",
+          "lock": null,
+          "completeTasks": null,
+          "phase": "current",
+          "errors": null,
+          "board": "offer-sources"
+        }
+        """;
   }
 
   @BeforeAll
@@ -521,18 +522,16 @@ class JsonInputTest {
   @Test
   void testIfIgnorePathDoNotSkipRowIfInputIsNullOrFieldNotFound() throws 
Exception {
 
-    final String input1 =
-        """
-            { "value1": "1",
-              "value2": "2",
-            }""";
+    final String input1 = """
+                                               { "value1": "1",
+                                                 "value2": "2",
+                                               }""";
     final String input2 = "{ \"value1\": \"3\"" + "}";
     final String input3 = "{ \"value2\": \"4\"" + "}";
-    final String input4 =
-        """
-            { "value1": null,
-              "value2": null,
-            }""";
+    final String input4 = """
+                                               { "value1": null,
+                                                 "value2": null,
+                                               }""";
     final String input5 = "{}";
     final String input6 = null;
 
@@ -618,11 +617,10 @@ class JsonInputTest {
 
   @Test
   void testPathMissingIgnore() throws Exception {
-    final String input =
-        """
-            { "value1": "1",
-              "value2": "2",
-            }""";
+    final String input = """
+                                               { "value1": "1",
+                                                 "value2": "2",
+                                               }""";
     final String inCol = "input";
 
     JsonInputField aField = new JsonInputField();
@@ -1005,11 +1003,6 @@ class JsonInputTest {
         }
       }
 
-      String json =
-          FileObjectUtils.getContentAsString(
-              HopVfs.getFileObject("zip:" + BASE_RAM_DIR + 
"test.zip!/test.json"),
-              StandardCharsets.UTF_8);
-
       JsonInputField price = new JsonInputField();
       price.setName("price");
       price.setType(IValueMeta.TYPE_NUMBER);
@@ -1141,6 +1134,48 @@ class JsonInputTest {
     }
   }
 
+  @Test
+  void testZeroSizeFileIgnoredThenReadsNext() throws Exception {
+    helper.redirectLog(new ByteArrayOutputStream(), LogLevel.BASIC);
+    try (LocaleChange enUs = new LocaleChange(Locale.US)) {
+      FileObject empty = HopVfs.getFileObject(BASE_RAM_DIR + "empty.json");
+      empty.createFile();
+      FileObject good = HopVfs.getFileObject(BASE_RAM_DIR + "good.json");
+      try (OutputStream os = good.getContent().getOutputStream()) {
+        IOUtils.write(getBasicTestJson(), os, StandardCharsets.UTF_8);
+      }
+
+      JsonInputField price = new JsonInputField();
+      price.setName("price");
+      price.setType(IValueMeta.TYPE_NUMBER);
+      price.setPath("$..book[*].price");
+
+      JsonInputMeta meta = createSimpleMeta("in file", price);
+      meta.setIsAFile(true);
+      meta.setRemoveSourceField(true);
+      meta.setIgnoringEmptyFile(true);
+      meta.setIgnoringMissingPath(false);
+
+      JsonInput jsonInput =
+          createJsonInput(
+              "in file",
+              meta,
+              new Object[][] {
+                new Object[] {BASE_RAM_DIR + "empty.json"},
+                new Object[] {BASE_RAM_DIR + "good.json"}
+              });
+      RowComparatorListener rowComparator =
+          new RowComparatorListener(
+              new Object[] {8.95}, new Object[] {12.99}, new Object[] {8.99}, 
new Object[] {22.99});
+      jsonInput.addRowListener(rowComparator);
+      processRows(jsonInput, 8);
+      assertEquals(0, jsonInput.getErrors());
+      assertEquals(4, jsonInput.getLinesWritten());
+    } finally {
+      deleteFiles();
+    }
+  }
+
   @Test
   void testBracketEscape() throws Exception {
     String input = "{\"a\":1,\"b(1)\":2}";
@@ -1201,7 +1236,7 @@ class JsonInputTest {
         });
     processRows(jsonInput, 3);
     assertEquals(1, errorLines.size(), "fwd error");
-    assertEquals(input1, errorLines.get(0)[0], "input in err line");
+    assertEquals(input1, errorLines.getFirst()[0], "input in err line");
     assertEquals(1, jsonInput.getLinesWritten(), "rows written");
   }
 
@@ -1463,7 +1498,7 @@ class JsonInputTest {
   }
 
   /** compare json (deep equals ignoring order) */
-  protected static final boolean jsonEquals(String json1, String json2) throws 
Exception {
+  protected static boolean jsonEquals(String json1, String json2) throws 
Exception {
     ObjectMapper om = HopJson.newMapper();
     JsonNode parsedJson1 = om.readTree(json1);
     JsonNode parsedJson2 = om.readTree(json2);
@@ -1503,11 +1538,10 @@ class JsonInputTest {
     b.setPath("$..B.F2");
     b.setType(IValueMeta.TYPE_STRING);
     // Create two meta inputs with two different orders a,b and b,a
-    List results = new ArrayList<>();
+    List<Object> results = new ArrayList<>();
     List<JsonInputMeta> metas =
         Arrays.asList(createSimpleMeta("json", a, b), createSimpleMeta("json", 
b, a));
     for (JsonInputMeta meta : metas) {
-      JsonInputMeta metaAB = createSimpleMeta("json", a, b);
       JsonInput jsonInput =
           createJsonInput(
               "json", meta, new Object[] {"{'B':{'F2': one}, 'C':{'B': {'F2': 
three}}}"});
diff --git 
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReaderTest.java
 
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReaderTest.java
index 254a81e320..7f44079b78 100644
--- 
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReaderTest.java
+++ 
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReaderTest.java
@@ -18,7 +18,9 @@
 package org.apache.hop.pipeline.transforms.jsoninput.reader;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import com.jayway.jsonpath.Option;
@@ -40,24 +42,24 @@ class FastJsonReaderTest {
   private static final Option[] OPTIONS_WO_DEFAULT_PATH_LEAF_TO_NULL = {
     Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST
   };
-  private EnumSet<Option> expectedOptions = EnumSet.noneOf(Option.class);
+  private final EnumSet<Option> expectedOptions = EnumSet.noneOf(Option.class);
   private JsonInputField[] fields;
 
   private FastJsonReader fJsonReader;
-  private ILogChannel logMock = mock(ILogChannel.class);
+  private final ILogChannel logMock = mock(ILogChannel.class);
 
   @BeforeEach
-  void setUp() throws Exception {
+  void setUp() {
     fields = new JsonInputField[] {};
   }
 
   @Test
-  void testFastJsonReaderCreated_Default() throws HopException {
+  void testFastJsonReaderCreated_Default() {
     fJsonReader = new FastJsonReader(logMock);
     expectedOptions.addAll(Arrays.asList(DEFAULT_OPTIONS));
     assertNotNull(fJsonReader);
-    assertEquals(false, fJsonReader.isIgnoreMissingPath());
-    assertEquals(true, fJsonReader.isDefaultPathLeafToNull());
+    assertFalse(fJsonReader.isIgnoreMissingPath());
+    assertTrue(fJsonReader.isDefaultPathLeafToNull());
     assertEquals(expectedOptions, 
fJsonReader.getJsonConfiguration().getOptions());
   }
 
@@ -66,8 +68,8 @@ class FastJsonReaderTest {
     expectedOptions.addAll(Arrays.asList(DEFAULT_OPTIONS));
     fJsonReader = new FastJsonReader(fields, logMock);
     assertNotNull(fJsonReader);
-    assertEquals(false, fJsonReader.isIgnoreMissingPath());
-    assertEquals(true, fJsonReader.isDefaultPathLeafToNull());
+    assertFalse(fJsonReader.isIgnoreMissingPath());
+    assertTrue(fJsonReader.isDefaultPathLeafToNull());
     assertEquals(expectedOptions, 
fJsonReader.getJsonConfiguration().getOptions());
   }
 
@@ -76,8 +78,8 @@ class FastJsonReaderTest {
     
expectedOptions.addAll(Arrays.asList(OPTIONS_WO_DEFAULT_PATH_LEAF_TO_NULL));
     fJsonReader = new FastJsonReader(fields, false, logMock);
     assertNotNull(fJsonReader);
-    assertEquals(false, fJsonReader.isIgnoreMissingPath());
-    assertEquals(false, fJsonReader.isDefaultPathLeafToNull());
+    assertFalse(fJsonReader.isIgnoreMissingPath());
+    assertFalse(fJsonReader.isDefaultPathLeafToNull());
     assertEquals(expectedOptions, 
fJsonReader.getJsonConfiguration().getOptions());
   }
 
@@ -86,13 +88,13 @@ class FastJsonReaderTest {
     expectedOptions.addAll(Arrays.asList(DEFAULT_OPTIONS));
     fJsonReader = new FastJsonReader(fields, true, logMock);
     assertNotNull(fJsonReader);
-    assertEquals(false, fJsonReader.isIgnoreMissingPath());
-    assertEquals(true, fJsonReader.isDefaultPathLeafToNull());
+    assertFalse(fJsonReader.isIgnoreMissingPath());
+    assertTrue(fJsonReader.isDefaultPathLeafToNull());
     assertEquals(expectedOptions, 
fJsonReader.getJsonConfiguration().getOptions());
   }
 
   @Test
-  void testFastJsonReaderGetMaxRowSize() throws HopException {
+  void testFastJsonReaderGetMaxRowSize() {
     List<List<Integer>> mainList = new ArrayList<>();
     List<Integer> l1 = new ArrayList<>();
     List<Integer> l2 = new ArrayList<>();
diff --git 
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReaderTests.java
 
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReaderTests.java
new file mode 100644
index 0000000000..2e5d9cb919
--- /dev/null
+++ 
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReaderTests.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.jsoninput.reader;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.vfs2.FileObject;
+import org.apache.commons.vfs2.FileSystemException;
+import org.apache.hop.core.exception.HopPluginException;
+import org.apache.hop.core.fileinput.FileInputList;
+import org.apache.hop.core.json.HopJson;
+import org.apache.hop.core.logging.ILoggingObject;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.row.value.ValueMetaFactory;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.core.vfs.HopVfs;
+import org.apache.hop.junit.rules.RestoreHopEngineEnvironmentExtension;
+import org.apache.hop.pipeline.transforms.jsoninput.JsonInput;
+import org.apache.hop.pipeline.transforms.jsoninput.JsonInputData;
+import org.apache.hop.pipeline.transforms.jsoninput.JsonInputField;
+import org.apache.hop.pipeline.transforms.jsoninput.JsonInputMeta;
+import org.apache.hop.pipeline.transforms.mock.TransformMockHelper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
+
+/**
+ * Unit tests for {@link InputsReader}. Placed in this package to exercise 
protected {@link
+ * InputsReader#getFieldIterator()} and nested iterators.
+ */
+@ExtendWith(RestoreHopEngineEnvironmentExtension.class)
+class InputsReaderTests {
+
+  private static final String BASE_RAM_DIR = "ram:///inputsReaderTest/";
+  private static final String BASIC_JSON =
+      "{\"store\":{\"book\":[{\"price\":1.0},{\"price\":2.0}]}}";
+
+  private TransformMockHelper<JsonInputMeta, JsonInputData> helper;
+
+  @BeforeEach
+  void setUp() {
+    helper =
+        new TransformMockHelper<>("inputs reader test", JsonInputMeta.class, 
JsonInputData.class);
+    when(helper.logChannelFactory.create(any(), any(ILoggingObject.class)))
+        .thenReturn(helper.iLogChannel);
+    when(helper.pipeline.isRunning()).thenReturn(true);
+  }
+
+  @AfterEach
+  void tearDown() throws Exception {
+    helper.cleanUp();
+    try (FileObject baseDir = HopVfs.getFileObject(BASE_RAM_DIR)) {
+      baseDir.deleteAll();
+    }
+  }
+
+  private static final class RecordingErrorHandler implements 
InputsReader.ErrorHandler {
+    final List<Exception> errors = new ArrayList<>();
+    final List<FileSystemException> fileOpenErrors = new ArrayList<>();
+    final List<FileSystemException> fileCloseErrors = new ArrayList<>();
+
+    @Override
+    public void error(Exception thrown) {
+      errors.add(thrown);
+    }
+
+    @Override
+    public void fileOpenError(FileObject file, FileSystemException exception) {
+      fileOpenErrors.add(exception);
+    }
+
+    @Override
+    public void fileCloseError(FileObject file, FileSystemException exception) 
{
+      fileCloseErrors.add(exception);
+    }
+  }
+
+  /** JsonInput.init() requires at least one field so FastJsonReader can be 
built. */
+  private static void ensureMinimalInputField(JsonInputMeta meta) {
+    if (meta.getInputFields().isEmpty()) {
+      JsonInputField f = new JsonInputField();
+      f.setName("_");
+      f.setPath("$");
+      f.setType(IValueMeta.TYPE_STRING);
+      meta.getInputFields().add(f);
+    }
+  }
+
+  private JsonInputMeta createFileListMeta(final List<FileObject> files) {
+    JsonInputMeta meta =
+        new JsonInputMeta() {
+          @Override
+          public FileInputList getFileInputList(IVariables variables) {
+            return new FileInputList() {
+              @Override
+              public List<FileObject> getFiles() {
+                return files;
+              }
+
+              @Override
+              public int nrOfFiles() {
+                return files.size();
+              }
+            };
+          }
+        };
+    meta.setDefault();
+    meta.setInFields(false);
+    ensureMinimalInputField(meta);
+    return meta;
+  }
+
+  private JsonInput newJsonInputWithoutInputRows(JsonInputMeta meta, 
JsonInputData data) {
+    ensureMinimalInputField(meta);
+    JsonInput jsonInput =
+        new JsonInput(helper.transformMeta, meta, data, 0, 
helper.pipelineMeta, helper.pipeline);
+    jsonInput.init();
+    return jsonInput;
+  }
+
+  private JsonInput newJsonInputWithStringRows(
+      JsonInputMeta meta, JsonInputData data, String columnName, Object[]... 
rows)
+      throws HopPluginException {
+    ensureMinimalInputField(meta);
+    JsonInput jsonInput =
+        new JsonInput(helper.transformMeta, meta, data, 0, 
helper.pipelineMeta, helper.pipeline);
+    IRowMeta rowMeta = new RowMeta();
+    rowMeta.addValueMeta(ValueMetaFactory.createValueMeta(columnName, 
IValueMeta.TYPE_STRING));
+    jsonInput.setInputRowMeta(rowMeta);
+    jsonInput.addRowSetToInputRowSets(helper.getMockInputRowSet(rows));
+    jsonInput.init();
+    return jsonInput;
+  }
+
+  private JsonInput newJsonInputWithJsonRows(
+      JsonInputMeta meta, JsonInputData data, Object[]... rows) throws 
HopPluginException {
+    ensureMinimalInputField(meta);
+    JsonInput jsonInput =
+        new JsonInput(helper.transformMeta, meta, data, 0, 
helper.pipelineMeta, helper.pipeline);
+    IRowMeta rowMeta = new RowMeta();
+    rowMeta.addValueMeta(ValueMetaFactory.createValueMeta("j", 
IValueMeta.TYPE_JSON));
+    jsonInput.setInputRowMeta(rowMeta);
+    jsonInput.addRowSetToInputRowSets(helper.getMockInputRowSet(rows));
+    jsonInput.init();
+    return jsonInput;
+  }
+
+  @Test
+  void iterator_staticFileList_returnsStreamsForEachFile() throws Exception {
+    try (FileObject f1 = HopVfs.getFileObject(BASE_RAM_DIR + "a.json")) {
+      f1.createFile();
+      try (OutputStream os = f1.getContent().getOutputStream()) {
+        os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+      }
+      JsonInputMeta meta = createFileListMeta(List.of(f1));
+      JsonInputData data = new JsonInputData();
+      JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+      InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+
+      var it = reader.iterator();
+      assertTrue(it.hasNext());
+      try (InputStream in = it.next()) {
+        assertNotNull(in);
+        String body = IOUtils.toString(in, StandardCharsets.UTF_8);
+        assertTrue(body.contains("store"));
+      }
+      assertFalse(it.hasNext());
+    }
+  }
+
+  @Test
+  void iterator_cachesFileInputListOnData() throws Exception {
+    try (FileObject f1 = HopVfs.getFileObject(BASE_RAM_DIR + "cached.json")) {
+      f1.createFile();
+      try (OutputStream os = f1.getContent().getOutputStream()) {
+        os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+      }
+      AtomicInteger listBuilds = new AtomicInteger();
+      JsonInputMeta meta =
+          new JsonInputMeta() {
+            @Override
+            public FileInputList getFileInputList(IVariables variables) {
+              listBuilds.incrementAndGet();
+              return new FileInputList() {
+                @Override
+                public List<FileObject> getFiles() {
+                  return List.of(f1);
+                }
+
+                @Override
+                public int nrOfFiles() {
+                  return 1;
+                }
+              };
+            }
+          };
+      meta.setDefault();
+      meta.setInFields(false);
+      ensureMinimalInputField(meta);
+      JsonInputData data = new JsonInputData();
+      JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+
+      new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler()).iterator();
+      assertEquals(1, listBuilds.get());
+      assertNotNull(data.files);
+
+      new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler()).iterator();
+      assertEquals(1, listBuilds.get(), "Second iterator() must reuse 
data.files");
+    }
+  }
+
+  @Test
+  void iterator_acceptingFilenames_resolvesPathsFromField() throws Exception {
+    try (FileObject f1 = HopVfs.getFileObject(BASE_RAM_DIR + 
"fromfield.json")) {
+      f1.createFile();
+      try (OutputStream os = f1.getContent().getOutputStream()) {
+        os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+      }
+      JsonInputMeta meta = new JsonInputMeta();
+      meta.setDefault();
+      ensureMinimalInputField(meta);
+      meta.setInFields(true);
+      meta.setFieldValue("pathcol");
+      meta.setIsAFile(true);
+      meta.getFileInput().setAcceptingFilenames(true);
+      meta.getFileInput().setAcceptingField("pathcol");
+
+      JsonInputData data = new JsonInputData();
+      data.indexSourceField = 0;
+      JsonInput jsonInput =
+          newJsonInputWithStringRows(meta, data, "pathcol", new Object[] 
{f1.getName().getURI()});
+
+      InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+      var it = reader.iterator();
+      assertTrue(it.hasNext());
+      try (InputStream in = it.next()) {
+        assertNotNull(in);
+        assertTrue(IOUtils.toString(in, 
StandardCharsets.UTF_8).contains("store"));
+      }
+      assertFalse(it.hasNext());
+    }
+  }
+
+  @Test
+  void iterator_directFieldContent_wrapsStringAsUtf8Stream() throws Exception {
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    meta.setInFields(true);
+    meta.setFieldValue("js");
+    meta.setSourceAFile(false);
+    meta.setReadUrl(false);
+
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "js", new 
Object[] {BASIC_JSON});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    var it = reader.iterator();
+    assertTrue(it.hasNext());
+    try (InputStream in = it.next()) {
+      assertEquals(BASIC_JSON, IOUtils.toString(in, StandardCharsets.UTF_8));
+    }
+    assertFalse(it.hasNext());
+  }
+
+  @Test
+  void iterator_readUrl_opensUrlStream() throws Exception {
+    File tmp = File.createTempFile("inputsreader", ".json");
+    tmp.deleteOnExit();
+    Files.writeString(tmp.toPath(), BASIC_JSON, StandardCharsets.UTF_8);
+    String url = tmp.toURI().toURL().toString();
+
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    ensureMinimalInputField(meta);
+    meta.setInFields(true);
+    meta.setFieldValue("loc");
+    meta.setSourceAFile(false);
+    meta.setReadUrl(true);
+
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "loc", new 
Object[] {url});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    var it = reader.iterator();
+    assertTrue(it.hasNext());
+    try (InputStream in = it.next()) {
+      assertTrue(IOUtils.toString(in, 
StandardCharsets.UTF_8).contains("store"));
+    }
+    assertFalse(it.hasNext());
+  }
+
+  @Test
+  void iterator_readUrl_badUrl_reportsErrorAndReturnsNullStream() throws 
Exception {
+    RecordingErrorHandler handler = new RecordingErrorHandler();
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    ensureMinimalInputField(meta);
+    meta.setInFields(true);
+    meta.setFieldValue("loc");
+    meta.setSourceAFile(false);
+    meta.setReadUrl(true);
+
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "loc", new 
Object[] {"bogus"});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, handler);
+    var it = reader.iterator();
+    assertTrue(it.hasNext());
+    assertNull(it.next());
+    assertEquals(1, handler.errors.size());
+  }
+
+  @Test
+  void iterator_skipsEmptyFileWhenConfigured_thenReturnsNextStream() throws 
Exception {
+    try (FileObject empty = HopVfs.getFileObject(BASE_RAM_DIR + "empty.json");
+        FileObject good = HopVfs.getFileObject(BASE_RAM_DIR + "good.json")) {
+      empty.createFile();
+      good.createFile();
+      try (OutputStream os = good.getContent().getOutputStream()) {
+        os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+      }
+      JsonInputMeta meta = createFileListMeta(List.of(empty, good));
+      meta.setIgnoringEmptyFile(true);
+
+      JsonInputData data = new JsonInputData();
+      JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+      InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+
+      var it = reader.iterator();
+      assertTrue(it.hasNext());
+      try (InputStream in = it.next()) {
+        assertNotNull(in);
+        assertTrue(IOUtils.toString(in, 
StandardCharsets.UTF_8).contains("store"));
+      }
+      assertFalse(it.hasNext());
+      assertFalse(data.skipEmptyFile);
+    }
+  }
+
+  @Test
+  void iterator_onNewFileRejectsWithoutSkip_returnsNull() throws Exception {
+    try (FileObject f1 = HopVfs.getFileObject(BASE_RAM_DIR + "reject.json")) {
+      f1.createFile();
+      try (OutputStream os = f1.getContent().getOutputStream()) {
+        os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+      }
+      JsonInputMeta meta = createFileListMeta(List.of(f1));
+      JsonInputData data = new JsonInputData();
+      JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+      JsonInput spyInput = spy(jsonInput);
+      
Mockito.doReturn(false).when(spyInput).onNewFile(org.mockito.ArgumentMatchers.any());
+
+      InputsReader reader = new InputsReader(spyInput, meta, data, new 
RecordingErrorHandler());
+      var it = reader.iterator();
+      assertTrue(it.hasNext());
+      assertNull(it.next());
+      assertFalse(it.hasNext());
+    }
+  }
+
+  @Test
+  void getFieldIterator_yieldsFieldStringsAndEnds() throws Exception {
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    ensureMinimalInputField(meta);
+    meta.setInFields(true);
+    meta.setFieldValue("c");
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput =
+        newJsonInputWithStringRows(meta, data, "c", new Object[] {"a"}, new 
Object[] {"b"});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    Iterator<String> fields = reader.getFieldIterator();
+    assertTrue(fields.hasNext());
+    assertEquals("a", fields.next());
+    assertTrue(fields.hasNext());
+    assertEquals("b", fields.next());
+    assertFalse(fields.hasNext());
+  }
+
+  @Test
+  void getFieldIterator_rowShorterThanIndex_returnsNull() throws Exception {
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    ensureMinimalInputField(meta);
+    meta.setInFields(true);
+    meta.setFieldValue("c");
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 1;
+    JsonInput jsonInput =
+        newJsonInputWithStringRows(meta, data, "c", new Object[] 
{"only-one-col"});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    Iterator<String> fields = reader.getFieldIterator();
+    assertTrue(fields.hasNext());
+    assertNull(fields.next());
+    assertFalse(fields.hasNext());
+  }
+
+  @Test
+  void getFieldIterator_nullCell_returnsNullString() throws Exception {
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    ensureMinimalInputField(meta);
+    meta.setInFields(true);
+    meta.setFieldValue("c");
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "c", new 
Object[] {null});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    Iterator<String> fields = reader.getFieldIterator();
+    assertTrue(fields.hasNext());
+    assertNull(fields.next());
+    assertFalse(fields.hasNext());
+  }
+
+  @Test
+  void getFieldIterator_nonStringCell_throwsClassCastException() throws 
Exception {
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    ensureMinimalInputField(meta);
+    meta.setInFields(true);
+    meta.setFieldValue("c");
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "c", new 
Object[] {42});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    Iterator<String> fields = reader.getFieldIterator();
+    assertTrue(fields.hasNext());
+    assertThrows(ClassCastException.class, fields::next);
+  }
+
+  @Test
+  void jsonFieldIterator_yieldsJsonNodes() throws Exception {
+    ObjectMapper mapper = HopJson.newMapper();
+    JsonNode node = mapper.readTree(BASIC_JSON);
+
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    ensureMinimalInputField(meta);
+    meta.setInFields(true);
+    meta.setFieldValue("j");
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput = newJsonInputWithJsonRows(meta, data, new Object[] 
{node});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    Iterator<JsonNode> it = reader.jsonFieldIterator();
+    assertTrue(it.hasNext());
+    assertEquals(node, it.next());
+    assertFalse(it.hasNext());
+  }
+
+  @Test
+  void jsonFieldIterator_nonJsonCell_throwsClassCastException() throws 
Exception {
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    ensureMinimalInputField(meta);
+    meta.setInFields(true);
+    meta.setFieldValue("j");
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput =
+        newJsonInputWithStringRows(meta, data, "j", new Object[] 
{"not-json-node"});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    Iterator<JsonNode> it = reader.jsonFieldIterator();
+    assertTrue(it.hasNext());
+    assertThrows(ClassCastException.class, it::next);
+  }
+
+  @Test
+  void iterators_removeUnsupported() throws Exception {
+    JsonInputMeta meta = new JsonInputMeta();
+    meta.setDefault();
+    meta.setInFields(true);
+    meta.setFieldValue("js");
+    meta.setSourceAFile(false);
+    meta.setReadUrl(false);
+    JsonInputData data = new JsonInputData();
+    data.indexSourceField = 0;
+    JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "js", new 
Object[] {"{}"});
+
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    Iterator<InputStream> iterator = reader.iterator();
+    assertThrows(UnsupportedOperationException.class, iterator::remove);
+
+    InputsReader.StringFieldIterator fieldIterator = reader.getFieldIterator();
+    assertThrows(UnsupportedOperationException.class, fieldIterator::remove);
+
+    Iterator<JsonNode> jsonNodeIterator = reader.jsonFieldIterator();
+    assertThrows(UnsupportedOperationException.class, 
jsonNodeIterator::remove);
+  }
+
+  @Test
+  void iterator_inFieldsAndSourceIsFile_usesFileBranch() throws Exception {
+    final String path = BASE_RAM_DIR + "isFile.json";
+    try (FileObject writeTo = HopVfs.getFileObject(path)) {
+      writeTo.createFile();
+      try (OutputStream os = writeTo.getContent().getOutputStream()) {
+        os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+      }
+    }
+
+    // New FileObject for the list so size/exists match what onNewFile + 
getInputStream see (RAM
+    // VFS).
+    try (FileObject f1 = HopVfs.getFileObject(path)) {
+      assertTrue(f1.exists(), "file must exist before iterator");
+      assertTrue(f1.getContent().getSize() > 0, "file must be non-empty before 
iterator");
+
+      JsonInputMeta meta = createFileListMeta(List.of(f1));
+      meta.setInFields(true);
+      // Use setIsAFile (not only Lombok setSourceAFile) so sourceAFile is set 
on anonymous
+      // JsonInputMeta subclasses.
+      meta.setIsAFile(true);
+      meta.getFileInput().setAcceptingFilenames(false);
+      meta.setFieldValue("_");
+
+      JsonInputData data = new JsonInputData();
+      JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+      InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+      var it = reader.iterator();
+      assertTrue(it.hasNext(), "!inFields || getIsAFile must be true to use 
file list iterator");
+      try (InputStream in = it.next()) {
+        assertNotNull(in, "expected stream after onNewFile accepted the file");
+        assertTrue(IOUtils.toString(in, 
StandardCharsets.UTF_8).contains("store"));
+      }
+      assertFalse(it.hasNext());
+    }
+  }
+
+  @Test
+  void iterator_whenNoFiles_hasNextFalse() {
+    JsonInputMeta meta = createFileListMeta(Collections.emptyList());
+    JsonInputData data = new JsonInputData();
+    JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+    InputsReader reader = new InputsReader(jsonInput, meta, data, new 
RecordingErrorHandler());
+    assertFalse(reader.iterator().hasNext());
+  }
+}

Reply via email to