This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/main by this push:
new 837a491117 Fix JSONinput doesnt stop processing empty files after
detecting that they are empty (#7104)
837a491117 is described below
commit 837a4911171797c904898bd26122b5b58530d295
Author: Lance <[email protected]>
AuthorDate: Mon May 18 21:09:46 2026 +0800
Fix JSONinput doesnt stop processing empty files after detecting that they
are empty (#7104)
rebase fix
Signed-off-by: lance <[email protected]>
---
.../pipeline/transforms/jsoninput/JsonInput.java | 158 +++---
.../transforms/jsoninput/JsonInputData.java | 13 +-
.../jsoninput/reader/FastJsonReader.java | 5 +
.../transforms/jsoninput/reader/IJsonReader.java | 6 +
.../transforms/jsoninput/reader/InputsReader.java | 8 +-
.../transforms/jsoninput/JsonInputDataTests.java | 59 +++
.../transforms/jsoninput/JsonInputTest.java | 208 +++++---
.../jsoninput/reader/FastJsonReaderTest.java | 28 +-
.../jsoninput/reader/InputsReaderTests.java | 586 +++++++++++++++++++++
9 files changed, 907 insertions(+), 164 deletions(-)
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
index 3073c1c159..59ecb79ec1 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
@@ -20,7 +20,9 @@ package org.apache.hop.pipeline.transforms.jsoninput;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.io.ByteArrayInputStream;
+import java.io.IOException;
import java.io.InputStream;
+import java.io.PushbackInputStream;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
@@ -111,6 +113,24 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
@Override
public boolean processRow() throws HopException {
+ beginProcessRowIteration();
+ try {
+ if (!writeOutputRowIfPossible(getOneOutputRow())) {
+ return false;
+ }
+ } catch (JsonInputException e) {
+ if (!handleJsonInputExceptionInProcessRow()) {
+ return false;
+ }
+ } catch (Exception e) {
+ if (!handleUnexpectedExceptionInProcessRow(e)) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ private void beginProcessRowIteration() throws HopException {
if (first) {
first = false;
prepareToRowProcessing(false);
@@ -121,58 +141,57 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
BaseMessages.getString(PKG, "JsonInput.Log.ReceivingMultiRows"));
}
}
+ }
- Object[] outRow;
- try {
- // Grab a row
- outRow = getOneOutputRow();
- if (outRow == null) {
- setOutputDone(); // signal end to receiver(s)
- return false; // end of data or error.
- }
-
- if (isRowLevel()) {
- logRowlevel(
- BaseMessages.getString(
- PKG, "JsonInput.Log.ReadRow",
data.outputRowMeta.getString(outRow)));
- }
- incrementLinesInput();
- data.rownr++;
-
- putRow(data.outputRowMeta, outRow); // copy row to output rowset(s)
-
- if (meta.getRowLimit() > 0 && data.rownr > meta.getRowLimit()) {
- // limit has been reached: stop now.
- setOutputDone();
- return false;
- }
+ private boolean writeOutputRowIfPossible(Object[] outRow) throws
HopException {
+ if (outRow == null) {
+ setOutputDone();
+ return false;
+ }
+ if (isRowLevel()) {
+ logRowlevel(
+ BaseMessages.getString(
+ PKG, "JsonInput.Log.ReadRow",
data.outputRowMeta.getString(outRow)));
+ }
+ incrementLinesInput();
+ data.rownr++;
+ putRow(data.outputRowMeta, outRow);
+ if (meta.getRowLimit() > 0 && data.rownr > meta.getRowLimit()) {
+ setOutputDone();
+ return false;
+ }
+ return true;
+ }
- } catch (JsonInputException e) {
- if (!getTransformMeta().isDoingErrorHandling()) {
- stopErrorExecution(e);
- return false;
- }
- } catch (Exception e) {
- logError(BaseMessages.getString(PKG,
"JsonInput.ErrorInTransformRunning", e.getMessage()));
- if (getTransformMeta().isDoingErrorHandling()) {
- sendErrorRow(e.toString());
- } else {
- incrementErrors();
- stopErrorExecution(e);
- return false;
- }
+ private boolean handleJsonInputExceptionInProcessRow() {
+ if (!getTransformMeta().isDoingErrorHandling()) {
+ stopErrorExecution();
+ return false;
}
return true;
}
- private void stopErrorExecution(Exception e) {
+ /**
+ * @return false if execution must stop
+ */
+ private boolean handleUnexpectedExceptionInProcessRow(Exception e) {
+ logError(BaseMessages.getString(PKG, "JsonInput.ErrorInTransformRunning",
e.getMessage()));
+ if (getTransformMeta().isDoingErrorHandling()) {
+ sendErrorRow(e.toString());
+ return true;
+ }
+ incrementErrors();
+ stopErrorExecution();
+ return false;
+ }
+
+ private void stopErrorExecution() {
stopAll();
setOutputDone();
}
@Override
- protected void prepareToRowProcessing(boolean errorIgnored)
- throws HopException, HopTransformException, HopValueException {
+ protected void prepareToRowProcessing(boolean errorIgnored) throws
HopException {
data.readrow = getRow();
data.inputRowMeta = getInputRowMeta();
if (!meta.isInFields() && data.inputRowMeta == null) {
@@ -264,18 +283,20 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
inputError(errMsg);
return false;
}
+
+ if (file.getContent().getSize() == 0 && meta.isIgnoringEmptyFile()) {
+ logBasic(BaseMessages.getString(PKG, "JsonInput.Error.FileSizeZero", ""
+ file.getName()));
+ // Skip opening/parsing this file; InputsReader will advance to the next
file.
+ data.skipEmptyFile = true;
+ return false;
+ }
if (hasAdditionalFileFields()) {
fillFileAdditionalFields(data, file);
}
if (file.getContent().getSize() == 0) {
- // log only basic as a warning (was before logError)
- if (meta.isIgnoringEmptyFile()) {
- logBasic(BaseMessages.getString(PKG, "JsonInput.Error.FileSizeZero",
"" + file.getName()));
- } else {
- logError(BaseMessages.getString(PKG, "JsonInput.Error.FileSizeZero",
"" + file.getName()));
- incrementErrors();
- return false;
- }
+ logError(BaseMessages.getString(PKG, "JsonInput.Error.FileSizeZero", ""
+ file.getName()));
+ incrementErrors();
+ return false;
}
return true;
}
@@ -293,22 +314,30 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
}
private void parseNextInputToRowSet(InputStream input) throws HopException {
- try {
- data.readerRowSet = data.reader.parseStringValue(input);
+ // Large pushback buffer: some streams pre-read; avoids buffer overflow on
unread(1).
+ try (PushbackInputStream pb = new PushbackInputStream(input, 8192)) {
+ if (meta.isIgnoringEmptyFile()) {
+ int first = pb.read();
+ if (first < 0) {
+ data.readerRowSet = data.jsonReader.emptyFieldRowSet();
+ return;
+ }
+ pb.unread(first);
+ input = pb;
+ }
+ data.readerRowSet = data.jsonReader.parseStringValue(input);
} catch (HopException ke) {
logInputError(ke);
throw new JsonInputException(ke);
} catch (Exception e) {
logInputError(e);
throw new JsonInputException(e);
- } finally {
- closeQuietly(input);
}
}
private void parseNextJsonToRowSet(JsonNode node) throws HopException {
try {
- data.readerRowSet = data.reader.parseJsonNodeValue(node);
+ data.readerRowSet = data.jsonReader.parseJsonNodeValue(node);
} catch (Exception e) {
logInputError(e);
throw new JsonInputException(e);
@@ -395,7 +424,7 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
}
/** get final row for output */
- private Object[] getOneOutputRow() throws HopException {
+ private Object[] getOneOutputRow() throws HopException, IOException {
if (meta.isInFields() && !data.hasFirstRow) {
return null;
}
@@ -427,7 +456,7 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
InputStream nextIn = data.inputs.next();
if (nextIn != null) {
CountingInputStream countingIn = new CountingInputStream(nextIn);
- try {
+ try (countingIn) {
parseNextInputToRowSet(countingIn);
} finally {
long bytesRead = countingIn.getCount();
@@ -450,7 +479,13 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
BaseTransform.closeQuietly(countingIn);
}
} else {
- parseNextInputToRowSet(new ByteArrayInputStream(EMPTY_JSON));
+ // Null stream (e.g. null field, iterator error path): do not
parse "{}" when ignoring
+ // empty input — that would still run JSONPath and fail if paths
are missing.
+ if (meta.isIgnoringEmptyFile()) {
+ data.readerRowSet = data.jsonReader.emptyFieldRowSet();
+ } else {
+ parseNextInputToRowSet(new ByteArrayInputStream(EMPTY_JSON));
+ }
}
} else {
if (isDetailed()) {
@@ -551,7 +586,7 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
}
// Add RootUri
if (!Utils.isEmpty(meta.getRootUriField())) {
- outputRowData[rowIndex++] = data.rootUriName;
+ outputRowData[rowIndex] = data.rootUriName;
}
}
@@ -565,8 +600,9 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
inputFields[i] = field;
}
// Instead of putting in the meta.inputFields, we put in our json path
resolved input fields
- data.reader = new FastJsonReader(inputFields,
meta.isDefaultPathLeafToNull(), getLogChannel());
- data.reader.setIgnoreMissingPath(meta.isIgnoringMissingPath());
+ data.jsonReader =
+ new FastJsonReader(inputFields, meta.isDefaultPathLeafToNull(),
getLogChannel());
+ data.jsonReader.setIgnoreMissingPath(meta.isIgnoringMissingPath());
}
@Override
@@ -575,7 +611,7 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
closeQuietly(data.file);
}
data.inputs = null;
- data.reader = null;
+ data.jsonReader = null;
data.readerRowSet = null;
data.repeatedFields = null;
super.dispose();
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
index a7abee5571..77c82ef72f 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
@@ -49,11 +49,20 @@ public class JsonInputData extends
BaseFileInputTransformData implements ITransf
public int indexSourceField;
public Iterator<InputStream> inputs;
- public Iterator<JsonNode> jsonInputs; // if incoming field is JsonNode
- public IJsonReader reader;
+
+ /** if incoming field is JsonNode */
+ public Iterator<JsonNode> jsonInputs;
+
+ public IJsonReader jsonReader;
public IRowSet readerRowSet;
public BitSet repeatedFields;
+ /**
+ * When {@link JsonInput#onNewFile} rejects an empty file with "ignore empty
file" enabled, the
+ * file iterator loops to the next file instead of substituting placeholder
JSON.
+ */
+ public boolean skipEmptyFile;
+
public JsonInputData() {
super();
nr_repeats = 0;
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
index 294dd9e4bc..1f91a5d506 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
@@ -183,6 +183,11 @@ public class FastJsonReader implements IJsonReader {
this.paths = compilePaths(fields);
}
+ @Override
+ public IRowSet emptyFieldRowSet() {
+ return getEmptyResponse();
+ }
+
@Override
public IRowSet parseStringValue(InputStream in) throws HopException {
readInput(in);
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
index 11836521fa..cd9f1a9e68 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
@@ -41,4 +41,10 @@ public interface IJsonReader {
/** parse incoming JsonNode fields into a rowset */
public IRowSet parseJsonNodeValue(com.fasterxml.jackson.databind.JsonNode
in) throws HopException;
+
+ /**
+ * One logical output row with a null for each configured field, without
reading a JSON document
+ * or evaluating JSON paths (used when input is an empty stream and empty
files are ignored).
+ */
+ IRowSet emptyFieldRowSet();
}
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
index e5202e3bd7..a5749b6033 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
@@ -147,7 +147,7 @@ public class InputsReader implements Iterable<InputStream> {
@Override
public InputStream tryNext() {
- if (hasNext()) {
+ while (hasNext()) {
if (data.file != null) {
try {
data.file.close();
@@ -161,8 +161,14 @@ public class InputsReader implements Iterable<InputStream>
{
if (transform.onNewFile(data.file)) {
return HopVfs.getInputStream(data.file);
}
+ if (data instanceof JsonInputData jsonData &&
jsonData.skipEmptyFile) {
+ jsonData.skipEmptyFile = false;
+ continue;
+ }
+ return null;
} catch (FileSystemException e) {
handler.fileOpenError(data.file, e);
+ return null;
}
}
return null;
diff --git
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputDataTests.java
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputDataTests.java
new file mode 100644
index 0000000000..844b4fbcad
--- /dev/null
+++
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputDataTests.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.jsoninput;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import org.junit.jupiter.api.Test;
+
+/** Unit test for {@link JsonInputData} */
+class JsonInputDataTests {
+
+ @Test
+ void constructorInitializesTransformDataDefaults() {
+ JsonInputData data = new JsonInputData();
+
+ assertEquals(0, data.nr_repeats);
+ assertNull(data.previousRow);
+ assertEquals(0, data.filenr);
+ assertEquals(-1, data.indexSourceField);
+ assertEquals(-1, data.nrInputFields);
+ assertNull(data.readrow);
+ assertEquals(0, data.totalpreviousfields);
+ assertFalse(data.hasFirstRow);
+ assertEquals(0L, data.rownr);
+ }
+
+ @Test
+ void skipEmptyFileDefaultsToFalse() {
+ JsonInputData data = new JsonInputData();
+ assertFalse(data.skipEmptyFile);
+ }
+
+ @Test
+ void skipEmptyFileIsMutableFlagForFileIterator() {
+ JsonInputData data = new JsonInputData();
+ data.skipEmptyFile = true;
+ assertTrue(data.skipEmptyFile);
+ data.skipEmptyFile = false;
+ assertFalse(data.skipEmptyFile);
+ }
+}
diff --git
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
index 4bc86e2a47..32767af2c1 100644
---
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
+++
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
@@ -41,7 +41,6 @@ import java.util.zip.ZipOutputStream;
import org.apache.commons.io.IOUtils;
import org.apache.commons.vfs2.FileObject;
import org.apache.commons.vfs2.FileSystemException;
-import org.apache.commons.vfs2.util.FileObjectUtils;
import org.apache.hop.core.HopClientEnvironment;
import org.apache.hop.core.IRowSet;
import org.apache.hop.core.exception.HopException;
@@ -78,71 +77,73 @@ class JsonInputTest {
protected static String getBasicTestJson() {
return """
- {
- "home": {},
- "store": {
- "book": [
- {
- "category": "reference",
- "author": "Nigel Rees",
- "title": "Sayings of the Century",
- "price": 8.95
- },
- {
- "category": "fiction",
- "author": "Evelyn Waugh",
- "title": "Sword of Honour",
- "price": 12.99
- },
- {
- "category": "fiction",
- "author": "Herman Melville",
- "title": "Moby Dick",
- "isbn": "0-553-21311-3",
- "price": 8.99
- },
- {
- "category": "fiction",
- "author": "J. R. R. Tolkien",
- "title": "The Lord of the Rings",
- "isbn": "0-395-19395-8",
- "price": 22.99
- }
- ],
- "bicycle": {
- "color": "red",
- "price": 19.95
- }
- }
- }""";
+ {
+ "home": {},
+ "store": {
+ "book": [
+ {
+ "category": "reference",
+ "author": "Nigel Rees",
+ "title": "Sayings of the Century",
+ "price": 8.95
+ },
+ {
+ "category": "fiction",
+ "author": "Evelyn Waugh",
+ "title": "Sword of Honour",
+ "price": 12.99
+ },
+ {
+ "category": "fiction",
+ "author": "Herman Melville",
+ "title": "Moby Dick",
+ "isbn": "0-553-21311-3",
+ "price": 8.99
+ },
+ {
+ "category": "fiction",
+ "author": "J. R. R. Tolkien",
+ "title": "The Lord of the Rings",
+ "isbn": "0-395-19395-8",
+ "price": 22.99
+ }
+ ],
+ "bicycle": {
+ "color": "red",
+ "price": 19.95
+ }
+ }
+ }""";
}
private static String getPDI17060Json() {
- return "{"
- + " \"path\":
\"/board/offer-sources/phases/current/cards/acquisitions\","
- + " \"id\": \"acquisitions\","
- + " \"template\": \"offer-sources\","
- + " \"creator\": \"admin\","
- + " \"created\": 1491703768197,"
- + " \"modifiedby\": null,"
- + " \"modified\": null,"
- + " \"color\": \"blue\","
- + " \"fields\": {"
- + " \"group-detail\": \"Offer Source Details\","
- + " \"name\": \"Acquisitions\""
- + " },"
- + " \"tasks\": 0,"
- + " \"history\": 1,"
- + " \"attachments\": 0,"
- + " \"comments\": 0,"
- + " \"alerts\": 0,"
- + " \"title\": \"Acquisitions\","
- + " \"lock\": null,"
- + " \"completeTasks\": null,"
- + " \"phase\": \"current\","
- + " \"errors\": null,"
- + " \"board\": \"offer-sources\""
- + "}";
+ return """
+ {
+ "path": "/board/offer-sources/phases/current/cards/acquisitions",
+ "id": "acquisitions",
+ "template": "offer-sources",
+ "creator": "admin",
+ "created": 1491703768197,
+ "modifiedby": null,
+ "modified": null,
+ "color": "blue",
+ "fields": {
+ "group-detail": "Offer Source Details",
+ "name": "Acquisitions"
+ },
+ "tasks": 0,
+ "history": 1,
+ "attachments": 0,
+ "comments": 0,
+ "alerts": 0,
+ "title": "Acquisitions",
+ "lock": null,
+ "completeTasks": null,
+ "phase": "current",
+ "errors": null,
+ "board": "offer-sources"
+ }
+ """;
}
@BeforeAll
@@ -521,18 +522,16 @@ class JsonInputTest {
@Test
void testIfIgnorePathDoNotSkipRowIfInputIsNullOrFieldNotFound() throws
Exception {
- final String input1 =
- """
- { "value1": "1",
- "value2": "2",
- }""";
+ final String input1 = """
+ { "value1": "1",
+ "value2": "2",
+ }""";
final String input2 = "{ \"value1\": \"3\"" + "}";
final String input3 = "{ \"value2\": \"4\"" + "}";
- final String input4 =
- """
- { "value1": null,
- "value2": null,
- }""";
+ final String input4 = """
+ { "value1": null,
+ "value2": null,
+ }""";
final String input5 = "{}";
final String input6 = null;
@@ -618,11 +617,10 @@ class JsonInputTest {
@Test
void testPathMissingIgnore() throws Exception {
- final String input =
- """
- { "value1": "1",
- "value2": "2",
- }""";
+ final String input = """
+ { "value1": "1",
+ "value2": "2",
+ }""";
final String inCol = "input";
JsonInputField aField = new JsonInputField();
@@ -1005,11 +1003,6 @@ class JsonInputTest {
}
}
- String json =
- FileObjectUtils.getContentAsString(
- HopVfs.getFileObject("zip:" + BASE_RAM_DIR +
"test.zip!/test.json"),
- StandardCharsets.UTF_8);
-
JsonInputField price = new JsonInputField();
price.setName("price");
price.setType(IValueMeta.TYPE_NUMBER);
@@ -1141,6 +1134,48 @@ class JsonInputTest {
}
}
+ @Test
+ void testZeroSizeFileIgnoredThenReadsNext() throws Exception {
+ helper.redirectLog(new ByteArrayOutputStream(), LogLevel.BASIC);
+ try (LocaleChange enUs = new LocaleChange(Locale.US)) {
+ FileObject empty = HopVfs.getFileObject(BASE_RAM_DIR + "empty.json");
+ empty.createFile();
+ FileObject good = HopVfs.getFileObject(BASE_RAM_DIR + "good.json");
+ try (OutputStream os = good.getContent().getOutputStream()) {
+ IOUtils.write(getBasicTestJson(), os, StandardCharsets.UTF_8);
+ }
+
+ JsonInputField price = new JsonInputField();
+ price.setName("price");
+ price.setType(IValueMeta.TYPE_NUMBER);
+ price.setPath("$..book[*].price");
+
+ JsonInputMeta meta = createSimpleMeta("in file", price);
+ meta.setIsAFile(true);
+ meta.setRemoveSourceField(true);
+ meta.setIgnoringEmptyFile(true);
+ meta.setIgnoringMissingPath(false);
+
+ JsonInput jsonInput =
+ createJsonInput(
+ "in file",
+ meta,
+ new Object[][] {
+ new Object[] {BASE_RAM_DIR + "empty.json"},
+ new Object[] {BASE_RAM_DIR + "good.json"}
+ });
+ RowComparatorListener rowComparator =
+ new RowComparatorListener(
+ new Object[] {8.95}, new Object[] {12.99}, new Object[] {8.99},
new Object[] {22.99});
+ jsonInput.addRowListener(rowComparator);
+ processRows(jsonInput, 8);
+ assertEquals(0, jsonInput.getErrors());
+ assertEquals(4, jsonInput.getLinesWritten());
+ } finally {
+ deleteFiles();
+ }
+ }
+
@Test
void testBracketEscape() throws Exception {
String input = "{\"a\":1,\"b(1)\":2}";
@@ -1201,7 +1236,7 @@ class JsonInputTest {
});
processRows(jsonInput, 3);
assertEquals(1, errorLines.size(), "fwd error");
- assertEquals(input1, errorLines.get(0)[0], "input in err line");
+ assertEquals(input1, errorLines.getFirst()[0], "input in err line");
assertEquals(1, jsonInput.getLinesWritten(), "rows written");
}
@@ -1463,7 +1498,7 @@ class JsonInputTest {
}
/** compare json (deep equals ignoring order) */
- protected static final boolean jsonEquals(String json1, String json2) throws
Exception {
+ protected static boolean jsonEquals(String json1, String json2) throws
Exception {
ObjectMapper om = HopJson.newMapper();
JsonNode parsedJson1 = om.readTree(json1);
JsonNode parsedJson2 = om.readTree(json2);
@@ -1503,11 +1538,10 @@ class JsonInputTest {
b.setPath("$..B.F2");
b.setType(IValueMeta.TYPE_STRING);
// Create two meta inputs with two different orders a,b and b,a
- List results = new ArrayList<>();
+ List<Object> results = new ArrayList<>();
List<JsonInputMeta> metas =
Arrays.asList(createSimpleMeta("json", a, b), createSimpleMeta("json",
b, a));
for (JsonInputMeta meta : metas) {
- JsonInputMeta metaAB = createSimpleMeta("json", a, b);
JsonInput jsonInput =
createJsonInput(
"json", meta, new Object[] {"{'B':{'F2': one}, 'C':{'B': {'F2':
three}}}"});
diff --git
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReaderTest.java
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReaderTest.java
index 254a81e320..7f44079b78 100644
---
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReaderTest.java
+++
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReaderTest.java
@@ -18,7 +18,9 @@
package org.apache.hop.pipeline.transforms.jsoninput.reader;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
import com.jayway.jsonpath.Option;
@@ -40,24 +42,24 @@ class FastJsonReaderTest {
private static final Option[] OPTIONS_WO_DEFAULT_PATH_LEAF_TO_NULL = {
Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST
};
- private EnumSet<Option> expectedOptions = EnumSet.noneOf(Option.class);
+ private final EnumSet<Option> expectedOptions = EnumSet.noneOf(Option.class);
private JsonInputField[] fields;
private FastJsonReader fJsonReader;
- private ILogChannel logMock = mock(ILogChannel.class);
+ private final ILogChannel logMock = mock(ILogChannel.class);
@BeforeEach
- void setUp() throws Exception {
+ void setUp() {
fields = new JsonInputField[] {};
}
@Test
- void testFastJsonReaderCreated_Default() throws HopException {
+ void testFastJsonReaderCreated_Default() {
fJsonReader = new FastJsonReader(logMock);
expectedOptions.addAll(Arrays.asList(DEFAULT_OPTIONS));
assertNotNull(fJsonReader);
- assertEquals(false, fJsonReader.isIgnoreMissingPath());
- assertEquals(true, fJsonReader.isDefaultPathLeafToNull());
+ assertFalse(fJsonReader.isIgnoreMissingPath());
+ assertTrue(fJsonReader.isDefaultPathLeafToNull());
assertEquals(expectedOptions,
fJsonReader.getJsonConfiguration().getOptions());
}
@@ -66,8 +68,8 @@ class FastJsonReaderTest {
expectedOptions.addAll(Arrays.asList(DEFAULT_OPTIONS));
fJsonReader = new FastJsonReader(fields, logMock);
assertNotNull(fJsonReader);
- assertEquals(false, fJsonReader.isIgnoreMissingPath());
- assertEquals(true, fJsonReader.isDefaultPathLeafToNull());
+ assertFalse(fJsonReader.isIgnoreMissingPath());
+ assertTrue(fJsonReader.isDefaultPathLeafToNull());
assertEquals(expectedOptions,
fJsonReader.getJsonConfiguration().getOptions());
}
@@ -76,8 +78,8 @@ class FastJsonReaderTest {
expectedOptions.addAll(Arrays.asList(OPTIONS_WO_DEFAULT_PATH_LEAF_TO_NULL));
fJsonReader = new FastJsonReader(fields, false, logMock);
assertNotNull(fJsonReader);
- assertEquals(false, fJsonReader.isIgnoreMissingPath());
- assertEquals(false, fJsonReader.isDefaultPathLeafToNull());
+ assertFalse(fJsonReader.isIgnoreMissingPath());
+ assertFalse(fJsonReader.isDefaultPathLeafToNull());
assertEquals(expectedOptions,
fJsonReader.getJsonConfiguration().getOptions());
}
@@ -86,13 +88,13 @@ class FastJsonReaderTest {
expectedOptions.addAll(Arrays.asList(DEFAULT_OPTIONS));
fJsonReader = new FastJsonReader(fields, true, logMock);
assertNotNull(fJsonReader);
- assertEquals(false, fJsonReader.isIgnoreMissingPath());
- assertEquals(true, fJsonReader.isDefaultPathLeafToNull());
+ assertFalse(fJsonReader.isIgnoreMissingPath());
+ assertTrue(fJsonReader.isDefaultPathLeafToNull());
assertEquals(expectedOptions,
fJsonReader.getJsonConfiguration().getOptions());
}
@Test
- void testFastJsonReaderGetMaxRowSize() throws HopException {
+ void testFastJsonReaderGetMaxRowSize() {
List<List<Integer>> mainList = new ArrayList<>();
List<Integer> l1 = new ArrayList<>();
List<Integer> l2 = new ArrayList<>();
diff --git
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReaderTests.java
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReaderTests.java
new file mode 100644
index 0000000000..2e5d9cb919
--- /dev/null
+++
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReaderTests.java
@@ -0,0 +1,586 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.pipeline.transforms.jsoninput.reader;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.File;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.vfs2.FileObject;
+import org.apache.commons.vfs2.FileSystemException;
+import org.apache.hop.core.exception.HopPluginException;
+import org.apache.hop.core.fileinput.FileInputList;
+import org.apache.hop.core.json.HopJson;
+import org.apache.hop.core.logging.ILoggingObject;
+import org.apache.hop.core.row.IRowMeta;
+import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.row.value.ValueMetaFactory;
+import org.apache.hop.core.variables.IVariables;
+import org.apache.hop.core.vfs.HopVfs;
+import org.apache.hop.junit.rules.RestoreHopEngineEnvironmentExtension;
+import org.apache.hop.pipeline.transforms.jsoninput.JsonInput;
+import org.apache.hop.pipeline.transforms.jsoninput.JsonInputData;
+import org.apache.hop.pipeline.transforms.jsoninput.JsonInputField;
+import org.apache.hop.pipeline.transforms.jsoninput.JsonInputMeta;
+import org.apache.hop.pipeline.transforms.mock.TransformMockHelper;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.ExtendWith;
+import org.mockito.Mockito;
+
+/**
+ * Unit tests for {@link InputsReader}. Placed in this package to exercise
protected {@link
+ * InputsReader#getFieldIterator()} and nested iterators.
+ */
+@ExtendWith(RestoreHopEngineEnvironmentExtension.class)
+class InputsReaderTests {
+
+ private static final String BASE_RAM_DIR = "ram:///inputsReaderTest/";
+ private static final String BASIC_JSON =
+ "{\"store\":{\"book\":[{\"price\":1.0},{\"price\":2.0}]}}";
+
+ private TransformMockHelper<JsonInputMeta, JsonInputData> helper;
+
+ @BeforeEach
+ void setUp() {
+ helper =
+ new TransformMockHelper<>("inputs reader test", JsonInputMeta.class,
JsonInputData.class);
+ when(helper.logChannelFactory.create(any(), any(ILoggingObject.class)))
+ .thenReturn(helper.iLogChannel);
+ when(helper.pipeline.isRunning()).thenReturn(true);
+ }
+
+ @AfterEach
+ void tearDown() throws Exception {
+ helper.cleanUp();
+ try (FileObject baseDir = HopVfs.getFileObject(BASE_RAM_DIR)) {
+ baseDir.deleteAll();
+ }
+ }
+
+ private static final class RecordingErrorHandler implements
InputsReader.ErrorHandler {
+ final List<Exception> errors = new ArrayList<>();
+ final List<FileSystemException> fileOpenErrors = new ArrayList<>();
+ final List<FileSystemException> fileCloseErrors = new ArrayList<>();
+
+ @Override
+ public void error(Exception thrown) {
+ errors.add(thrown);
+ }
+
+ @Override
+ public void fileOpenError(FileObject file, FileSystemException exception) {
+ fileOpenErrors.add(exception);
+ }
+
+ @Override
+ public void fileCloseError(FileObject file, FileSystemException exception)
{
+ fileCloseErrors.add(exception);
+ }
+ }
+
+ /** JsonInput.init() requires at least one field so FastJsonReader can be
built. */
+ private static void ensureMinimalInputField(JsonInputMeta meta) {
+ if (meta.getInputFields().isEmpty()) {
+ JsonInputField f = new JsonInputField();
+ f.setName("_");
+ f.setPath("$");
+ f.setType(IValueMeta.TYPE_STRING);
+ meta.getInputFields().add(f);
+ }
+ }
+
+ private JsonInputMeta createFileListMeta(final List<FileObject> files) {
+ JsonInputMeta meta =
+ new JsonInputMeta() {
+ @Override
+ public FileInputList getFileInputList(IVariables variables) {
+ return new FileInputList() {
+ @Override
+ public List<FileObject> getFiles() {
+ return files;
+ }
+
+ @Override
+ public int nrOfFiles() {
+ return files.size();
+ }
+ };
+ }
+ };
+ meta.setDefault();
+ meta.setInFields(false);
+ ensureMinimalInputField(meta);
+ return meta;
+ }
+
+ private JsonInput newJsonInputWithoutInputRows(JsonInputMeta meta,
JsonInputData data) {
+ ensureMinimalInputField(meta);
+ JsonInput jsonInput =
+ new JsonInput(helper.transformMeta, meta, data, 0,
helper.pipelineMeta, helper.pipeline);
+ jsonInput.init();
+ return jsonInput;
+ }
+
+ private JsonInput newJsonInputWithStringRows(
+ JsonInputMeta meta, JsonInputData data, String columnName, Object[]...
rows)
+ throws HopPluginException {
+ ensureMinimalInputField(meta);
+ JsonInput jsonInput =
+ new JsonInput(helper.transformMeta, meta, data, 0,
helper.pipelineMeta, helper.pipeline);
+ IRowMeta rowMeta = new RowMeta();
+ rowMeta.addValueMeta(ValueMetaFactory.createValueMeta(columnName,
IValueMeta.TYPE_STRING));
+ jsonInput.setInputRowMeta(rowMeta);
+ jsonInput.addRowSetToInputRowSets(helper.getMockInputRowSet(rows));
+ jsonInput.init();
+ return jsonInput;
+ }
+
+ private JsonInput newJsonInputWithJsonRows(
+ JsonInputMeta meta, JsonInputData data, Object[]... rows) throws
HopPluginException {
+ ensureMinimalInputField(meta);
+ JsonInput jsonInput =
+ new JsonInput(helper.transformMeta, meta, data, 0,
helper.pipelineMeta, helper.pipeline);
+ IRowMeta rowMeta = new RowMeta();
+ rowMeta.addValueMeta(ValueMetaFactory.createValueMeta("j",
IValueMeta.TYPE_JSON));
+ jsonInput.setInputRowMeta(rowMeta);
+ jsonInput.addRowSetToInputRowSets(helper.getMockInputRowSet(rows));
+ jsonInput.init();
+ return jsonInput;
+ }
+
+ @Test
+ void iterator_staticFileList_returnsStreamsForEachFile() throws Exception {
+ try (FileObject f1 = HopVfs.getFileObject(BASE_RAM_DIR + "a.json")) {
+ f1.createFile();
+ try (OutputStream os = f1.getContent().getOutputStream()) {
+ os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+ }
+ JsonInputMeta meta = createFileListMeta(List.of(f1));
+ JsonInputData data = new JsonInputData();
+ JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+
+ var it = reader.iterator();
+ assertTrue(it.hasNext());
+ try (InputStream in = it.next()) {
+ assertNotNull(in);
+ String body = IOUtils.toString(in, StandardCharsets.UTF_8);
+ assertTrue(body.contains("store"));
+ }
+ assertFalse(it.hasNext());
+ }
+ }
+
+ @Test
+ void iterator_cachesFileInputListOnData() throws Exception {
+ try (FileObject f1 = HopVfs.getFileObject(BASE_RAM_DIR + "cached.json")) {
+ f1.createFile();
+ try (OutputStream os = f1.getContent().getOutputStream()) {
+ os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+ }
+ AtomicInteger listBuilds = new AtomicInteger();
+ JsonInputMeta meta =
+ new JsonInputMeta() {
+ @Override
+ public FileInputList getFileInputList(IVariables variables) {
+ listBuilds.incrementAndGet();
+ return new FileInputList() {
+ @Override
+ public List<FileObject> getFiles() {
+ return List.of(f1);
+ }
+
+ @Override
+ public int nrOfFiles() {
+ return 1;
+ }
+ };
+ }
+ };
+ meta.setDefault();
+ meta.setInFields(false);
+ ensureMinimalInputField(meta);
+ JsonInputData data = new JsonInputData();
+ JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+
+ new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler()).iterator();
+ assertEquals(1, listBuilds.get());
+ assertNotNull(data.files);
+
+ new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler()).iterator();
+ assertEquals(1, listBuilds.get(), "Second iterator() must reuse
data.files");
+ }
+ }
+
+ @Test
+ void iterator_acceptingFilenames_resolvesPathsFromField() throws Exception {
+ try (FileObject f1 = HopVfs.getFileObject(BASE_RAM_DIR +
"fromfield.json")) {
+ f1.createFile();
+ try (OutputStream os = f1.getContent().getOutputStream()) {
+ os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+ }
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("pathcol");
+ meta.setIsAFile(true);
+ meta.getFileInput().setAcceptingFilenames(true);
+ meta.getFileInput().setAcceptingField("pathcol");
+
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput =
+ newJsonInputWithStringRows(meta, data, "pathcol", new Object[]
{f1.getName().getURI()});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ var it = reader.iterator();
+ assertTrue(it.hasNext());
+ try (InputStream in = it.next()) {
+ assertNotNull(in);
+ assertTrue(IOUtils.toString(in,
StandardCharsets.UTF_8).contains("store"));
+ }
+ assertFalse(it.hasNext());
+ }
+ }
+
+ @Test
+ void iterator_directFieldContent_wrapsStringAsUtf8Stream() throws Exception {
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ meta.setInFields(true);
+ meta.setFieldValue("js");
+ meta.setSourceAFile(false);
+ meta.setReadUrl(false);
+
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "js", new
Object[] {BASIC_JSON});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ var it = reader.iterator();
+ assertTrue(it.hasNext());
+ try (InputStream in = it.next()) {
+ assertEquals(BASIC_JSON, IOUtils.toString(in, StandardCharsets.UTF_8));
+ }
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ void iterator_readUrl_opensUrlStream() throws Exception {
+ File tmp = File.createTempFile("inputsreader", ".json");
+ tmp.deleteOnExit();
+ Files.writeString(tmp.toPath(), BASIC_JSON, StandardCharsets.UTF_8);
+ String url = tmp.toURI().toURL().toString();
+
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("loc");
+ meta.setSourceAFile(false);
+ meta.setReadUrl(true);
+
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "loc", new
Object[] {url});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ var it = reader.iterator();
+ assertTrue(it.hasNext());
+ try (InputStream in = it.next()) {
+ assertTrue(IOUtils.toString(in,
StandardCharsets.UTF_8).contains("store"));
+ }
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ void iterator_readUrl_badUrl_reportsErrorAndReturnsNullStream() throws
Exception {
+ RecordingErrorHandler handler = new RecordingErrorHandler();
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("loc");
+ meta.setSourceAFile(false);
+ meta.setReadUrl(true);
+
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "loc", new
Object[] {"bogus"});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, handler);
+ var it = reader.iterator();
+ assertTrue(it.hasNext());
+ assertNull(it.next());
+ assertEquals(1, handler.errors.size());
+ }
+
+ @Test
+ void iterator_skipsEmptyFileWhenConfigured_thenReturnsNextStream() throws
Exception {
+ try (FileObject empty = HopVfs.getFileObject(BASE_RAM_DIR + "empty.json");
+ FileObject good = HopVfs.getFileObject(BASE_RAM_DIR + "good.json")) {
+ empty.createFile();
+ good.createFile();
+ try (OutputStream os = good.getContent().getOutputStream()) {
+ os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+ }
+ JsonInputMeta meta = createFileListMeta(List.of(empty, good));
+ meta.setIgnoringEmptyFile(true);
+
+ JsonInputData data = new JsonInputData();
+ JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+
+ var it = reader.iterator();
+ assertTrue(it.hasNext());
+ try (InputStream in = it.next()) {
+ assertNotNull(in);
+ assertTrue(IOUtils.toString(in,
StandardCharsets.UTF_8).contains("store"));
+ }
+ assertFalse(it.hasNext());
+ assertFalse(data.skipEmptyFile);
+ }
+ }
+
+ @Test
+ void iterator_onNewFileRejectsWithoutSkip_returnsNull() throws Exception {
+ try (FileObject f1 = HopVfs.getFileObject(BASE_RAM_DIR + "reject.json")) {
+ f1.createFile();
+ try (OutputStream os = f1.getContent().getOutputStream()) {
+ os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+ }
+ JsonInputMeta meta = createFileListMeta(List.of(f1));
+ JsonInputData data = new JsonInputData();
+ JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+ JsonInput spyInput = spy(jsonInput);
+
Mockito.doReturn(false).when(spyInput).onNewFile(org.mockito.ArgumentMatchers.any());
+
+ InputsReader reader = new InputsReader(spyInput, meta, data, new
RecordingErrorHandler());
+ var it = reader.iterator();
+ assertTrue(it.hasNext());
+ assertNull(it.next());
+ assertFalse(it.hasNext());
+ }
+ }
+
+ @Test
+ void getFieldIterator_yieldsFieldStringsAndEnds() throws Exception {
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("c");
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput =
+ newJsonInputWithStringRows(meta, data, "c", new Object[] {"a"}, new
Object[] {"b"});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ Iterator<String> fields = reader.getFieldIterator();
+ assertTrue(fields.hasNext());
+ assertEquals("a", fields.next());
+ assertTrue(fields.hasNext());
+ assertEquals("b", fields.next());
+ assertFalse(fields.hasNext());
+ }
+
+ @Test
+ void getFieldIterator_rowShorterThanIndex_returnsNull() throws Exception {
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("c");
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 1;
+ JsonInput jsonInput =
+ newJsonInputWithStringRows(meta, data, "c", new Object[]
{"only-one-col"});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ Iterator<String> fields = reader.getFieldIterator();
+ assertTrue(fields.hasNext());
+ assertNull(fields.next());
+ assertFalse(fields.hasNext());
+ }
+
+ @Test
+ void getFieldIterator_nullCell_returnsNullString() throws Exception {
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("c");
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "c", new
Object[] {null});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ Iterator<String> fields = reader.getFieldIterator();
+ assertTrue(fields.hasNext());
+ assertNull(fields.next());
+ assertFalse(fields.hasNext());
+ }
+
+ @Test
+ void getFieldIterator_nonStringCell_throwsClassCastException() throws
Exception {
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("c");
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "c", new
Object[] {42});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ Iterator<String> fields = reader.getFieldIterator();
+ assertTrue(fields.hasNext());
+ assertThrows(ClassCastException.class, fields::next);
+ }
+
+ @Test
+ void jsonFieldIterator_yieldsJsonNodes() throws Exception {
+ ObjectMapper mapper = HopJson.newMapper();
+ JsonNode node = mapper.readTree(BASIC_JSON);
+
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("j");
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput = newJsonInputWithJsonRows(meta, data, new Object[]
{node});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ Iterator<JsonNode> it = reader.jsonFieldIterator();
+ assertTrue(it.hasNext());
+ assertEquals(node, it.next());
+ assertFalse(it.hasNext());
+ }
+
+ @Test
+ void jsonFieldIterator_nonJsonCell_throwsClassCastException() throws
Exception {
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ ensureMinimalInputField(meta);
+ meta.setInFields(true);
+ meta.setFieldValue("j");
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput =
+ newJsonInputWithStringRows(meta, data, "j", new Object[]
{"not-json-node"});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ Iterator<JsonNode> it = reader.jsonFieldIterator();
+ assertTrue(it.hasNext());
+ assertThrows(ClassCastException.class, it::next);
+ }
+
+ @Test
+ void iterators_removeUnsupported() throws Exception {
+ JsonInputMeta meta = new JsonInputMeta();
+ meta.setDefault();
+ meta.setInFields(true);
+ meta.setFieldValue("js");
+ meta.setSourceAFile(false);
+ meta.setReadUrl(false);
+ JsonInputData data = new JsonInputData();
+ data.indexSourceField = 0;
+ JsonInput jsonInput = newJsonInputWithStringRows(meta, data, "js", new
Object[] {"{}"});
+
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ Iterator<InputStream> iterator = reader.iterator();
+ assertThrows(UnsupportedOperationException.class, iterator::remove);
+
+ InputsReader.StringFieldIterator fieldIterator = reader.getFieldIterator();
+ assertThrows(UnsupportedOperationException.class, fieldIterator::remove);
+
+ Iterator<JsonNode> jsonNodeIterator = reader.jsonFieldIterator();
+ assertThrows(UnsupportedOperationException.class,
jsonNodeIterator::remove);
+ }
+
+ @Test
+ void iterator_inFieldsAndSourceIsFile_usesFileBranch() throws Exception {
+ final String path = BASE_RAM_DIR + "isFile.json";
+ try (FileObject writeTo = HopVfs.getFileObject(path)) {
+ writeTo.createFile();
+ try (OutputStream os = writeTo.getContent().getOutputStream()) {
+ os.write(BASIC_JSON.getBytes(StandardCharsets.UTF_8));
+ }
+ }
+
+ // New FileObject for the list so size/exists match what onNewFile +
getInputStream see (RAM
+ // VFS).
+ try (FileObject f1 = HopVfs.getFileObject(path)) {
+ assertTrue(f1.exists(), "file must exist before iterator");
+ assertTrue(f1.getContent().getSize() > 0, "file must be non-empty before
iterator");
+
+ JsonInputMeta meta = createFileListMeta(List.of(f1));
+ meta.setInFields(true);
+ // Use setIsAFile (not only Lombok setSourceAFile) so sourceAFile is set
on anonymous
+ // JsonInputMeta subclasses.
+ meta.setIsAFile(true);
+ meta.getFileInput().setAcceptingFilenames(false);
+ meta.setFieldValue("_");
+
+ JsonInputData data = new JsonInputData();
+ JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ var it = reader.iterator();
+ assertTrue(it.hasNext(), "!inFields || getIsAFile must be true to use
file list iterator");
+ try (InputStream in = it.next()) {
+ assertNotNull(in, "expected stream after onNewFile accepted the file");
+ assertTrue(IOUtils.toString(in,
StandardCharsets.UTF_8).contains("store"));
+ }
+ assertFalse(it.hasNext());
+ }
+ }
+
+ @Test
+ void iterator_whenNoFiles_hasNextFalse() {
+ JsonInputMeta meta = createFileListMeta(Collections.emptyList());
+ JsonInputData data = new JsonInputData();
+ JsonInput jsonInput = newJsonInputWithoutInputRows(meta, data);
+ InputsReader reader = new InputsReader(jsonInput, meta, data, new
RecordingErrorHandler());
+ assertFalse(reader.iterator().hasNext());
+ }
+}