This is an automated email from the ASF dual-hosted git repository.
hansva pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/hop.git
The following commit(s) were added to refs/heads/main by this push:
new 9574a84426 Implement JSON type support across core transforms #5755
(#5823)
9574a84426 is described below
commit 9574a844267e861ee7d94765272b63edd1abf3f2
Author: Matteo <[email protected]>
AuthorDate: Thu Oct 16 09:30:47 2025 +0200
Implement JSON type support across core transforms #5755 (#5823)
* Implement JSON type support across core transforms #5755
* Fix Round2 getCode
---
.../apache/hop/core/database/BaseDatabaseMeta.java | 33 ++-
.../java/org/apache/hop/core/row/IValueMeta.java | 7 +
.../apache/hop/core/row/value/ValueMetaBase.java | 17 +-
.../apache/hop/core/row/value/ValueMetaJson.java | 274 ++++++++++++++++++--
.../apache/hop/core/row/value/ValueMetaString.java | 29 +++
.../java/org/apache/hop/core/util/JsonUtil.java | 108 ++++++++
.../hop/core/row/value/ValueMetaJsonTest.java | 228 +++++++++++++++++
.../org/apache/hop/core/util/JsonUtilTest.java | 157 ++++++++++++
.../database/0033-postgres-json-insert-json.hpl | 114 +++++++++
.../database/0033-postgres-json-validate-json.hpl | 275 +++++++++++++++++++++
.../database/main-0033-postgres-json.hwf | 238 ++++++++++++++++++
.../json/0007-json-input-json-value.hpl | 235 ++++++++++++++++++
.../json/datasets/golden-json-input-json.csv | 4 +
.../json/main-0007-json-input-json-value.hwf | 80 ++++++
.../metadata/dataset/golden-json-input-json.json | 25 ++
.../unit-test/0007-json-input-json-value UNIT.json | 33 +++
.../mongo/tests/mongo-json/main-mongo-json.hwf | 163 ++++++++++++
.../mongo/tests/mongo-json/mongo-delete-json.hpl | 141 +++++++++++
.../mongo/tests/mongo-json/mongo-insert-json.hpl | 126 ++++++++++
.../mongo/tests/mongo-json/mongo-read-json.hpl | 207 ++++++++++++++++
.../calculator/calculations/math/Round2.java | 2 +-
.../pipeline/transforms/jsoninput/JsonInput.java | 90 +++++--
.../transforms/jsoninput/JsonInputData.java | 2 +
.../jsoninput/reader/FastJsonReader.java | 86 ++++++-
.../transforms/jsoninput/reader/IJsonReader.java | 5 +-
.../transforms/jsoninput/reader/InputsReader.java | 50 +++-
.../jsoninput/reader/RowOutputConverter.java | 4 +
.../transforms/jsoninput/JsonInputTest.java | 119 ++++++++-
.../apache/hop/mongo/wrapper/field/MongoField.java | 45 ++++
.../mongodbdelete/MongoDbDeleteData.java | 127 ++++++----
.../mongodbdelete/MongoDbDeleteDialog.java | 3 +
.../mongodboutput/MongoDbOutputData.java | 139 ++++++-----
.../mongodboutput/MongoDbOutputDialog.java | 3 +
.../java/org/apache/hop/uuid/ValueMetaUuid.java | 42 +---
34 files changed, 3003 insertions(+), 208 deletions(-)
diff --git
a/core/src/main/java/org/apache/hop/core/database/BaseDatabaseMeta.java
b/core/src/main/java/org/apache/hop/core/database/BaseDatabaseMeta.java
index 5226ab7637..b1403ac095 100644
--- a/core/src/main/java/org/apache/hop/core/database/BaseDatabaseMeta.java
+++ b/core/src/main/java/org/apache/hop/core/database/BaseDatabaseMeta.java
@@ -1946,22 +1946,33 @@ public abstract class BaseDatabaseMeta implements
Cloneable, IDatabase {
}
String typeName = rm.getColumnTypeName(index);
- // Most dbs expose uuid as "UUID", sql server (native) as
"UNIQUEIDENTIFIER"
- if ("uuid".equalsIgnoreCase(typeName) ||
"uniqueidentifier".equalsIgnoreCase(typeName)) {
- try {
+ if (typeName == null) {
+ return null;
+ }
- int uuidTypeId = ValueMetaFactory.getIdForValueMeta("UUID");
+ typeName = typeName.toLowerCase();
+ try {
+ switch (typeName) {
+ // Most dbs expose uuid as "UUID", sql server (native) as
"UNIQUEIDENTIFIER"
+ case "uniqueidentifier":
+ case "uuid":
+ {
+ int uuidTypeId = ValueMetaFactory.getIdForValueMeta("UUID");
- // Keep any existing metadata
- IValueMeta u = ValueMetaFactory.cloneValueMeta(v, uuidTypeId);
+ // Keep any existing metadata
+ IValueMeta u = ValueMetaFactory.cloneValueMeta(v, uuidTypeId);
- u.setLength(-1);
- u.setPrecision(-1);
+ u.setLength(-1);
+ u.setPrecision(-1);
- return u;
- } catch (HopPluginException ignore) {
- // UUID plugin not present
+ return u;
+ }
+ case "json":
+ case "jsonb":
+ return ValueMetaFactory.cloneValueMeta(v, IValueMeta.TYPE_JSON);
}
+ } catch (HopPluginException ignore) {
+ // plugin not present
}
return null;
}
diff --git a/core/src/main/java/org/apache/hop/core/row/IValueMeta.java
b/core/src/main/java/org/apache/hop/core/row/IValueMeta.java
index 3544bc1b35..072cb48d52 100644
--- a/core/src/main/java/org/apache/hop/core/row/IValueMeta.java
+++ b/core/src/main/java/org/apache/hop/core/row/IValueMeta.java
@@ -997,6 +997,13 @@ public interface IValueMeta extends Cloneable {
*/
boolean isNumeric();
+ /**
+ * Checks whether this Value is Json
+ *
+ * @return true if the value is Json
+ */
+ boolean isJson();
+
/**
* Return the type of a value in a textual form: "String", "Number",
"Integer", "Boolean", "Date",
* ...
diff --git
a/core/src/main/java/org/apache/hop/core/row/value/ValueMetaBase.java
b/core/src/main/java/org/apache/hop/core/row/value/ValueMetaBase.java
index 6a0f0bf594..7883e6e12a 100644
--- a/core/src/main/java/org/apache/hop/core/row/value/ValueMetaBase.java
+++ b/core/src/main/java/org/apache/hop/core/row/value/ValueMetaBase.java
@@ -76,6 +76,7 @@ import org.apache.hop.core.logging.ILogChannel;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.ValueDataUtil;
import org.apache.hop.core.util.EnvUtil;
+import org.apache.hop.core.util.JsonUtil;
import org.apache.hop.core.util.Utils;
import org.apache.hop.core.variables.IVariables;
import org.apache.hop.core.xml.XmlHandler;
@@ -3242,6 +3243,12 @@ public class ValueMetaBase implements IValueMeta {
return type == TYPE_BOOLEAN;
}
+ @Override
+ @JsonIgnore
+ public boolean isJson() {
+ return type == TYPE_JSON;
+ }
+
/**
* Checks whether or not this value is of type Serializable
*
@@ -3533,9 +3540,7 @@ public class ValueMetaBase implements IValueMeta {
if (jsonNode == null) {
outputStream.writeInt(-1);
} else {
- ObjectMapper objectMapper = new ObjectMapper();
- String string = objectMapper.writeValueAsString(jsonNode);
- byte[] chars = string.getBytes(StandardCharsets.UTF_8);
+ byte[] chars = JsonUtil.mapJsonToBytes(jsonNode);
outputStream.writeInt(chars.length);
outputStream.write(chars);
}
@@ -3575,8 +3580,7 @@ public class ValueMetaBase implements IValueMeta {
byte[] chars = new byte[inputLength];
inputStream.readFully(chars);
- ObjectMapper objectMapper = new ObjectMapper();
- return objectMapper.readTree(chars, 0, inputLength);
+ return JsonUtil.jsonMapper().readTree(chars, 0, inputLength);
}
protected byte[] readBinaryString(DataInputStream inputStream) throws
IOException {
@@ -4378,7 +4382,7 @@ public class ValueMetaBase implements IValueMeta {
}
}
- private int typeCompare(Object data1, Object data2) throws HopValueException
{
+ protected int typeCompare(Object data1, Object data2) throws
HopValueException {
int cmp = 0;
switch (getType()) {
case TYPE_STRING:
@@ -5557,6 +5561,7 @@ public class ValueMetaBase implements IValueMeta {
IValueMeta newV = null;
try {
+ // JSON type is handled here because its type is 1111 (Object) when
reading from SQL
newV = databaseMeta.getIDatabase().customizeValueFromSqlType(v, rm,
index);
} catch (SQLException e) {
throw new SQLException(e);
diff --git
a/core/src/main/java/org/apache/hop/core/row/value/ValueMetaJson.java
b/core/src/main/java/org/apache/hop/core/row/value/ValueMetaJson.java
index 2d71bff07f..198629dafa 100644
--- a/core/src/main/java/org/apache/hop/core/row/value/ValueMetaJson.java
+++ b/core/src/main/java/org/apache/hop/core/row/value/ValueMetaJson.java
@@ -18,9 +18,18 @@
package org.apache.hop.core.row.value;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.ObjectMapper;
+import java.io.IOException;
+import java.io.InputStream;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Types;
+import org.apache.hop.core.Const;
+import org.apache.hop.core.database.DatabaseMeta;
+import org.apache.hop.core.database.IDatabase;
+import org.apache.hop.core.exception.HopDatabaseException;
import org.apache.hop.core.exception.HopValueException;
-import org.apache.hop.core.json.HopJson;
+import org.apache.hop.core.util.JsonUtil;
@ValueMetaPlugin(id = "11", name = "JSON", description = "JSON object", image
= "images/json.svg")
public class ValueMetaJson extends ValueMetaBase {
@@ -28,17 +37,6 @@ public class ValueMetaJson extends ValueMetaBase {
/** Do String serialization using pretty printing? */
private boolean prettyPrinting;
- @Override
- public int compare(Object data1, Object data2) throws HopValueException {
- JsonNode json1 = (JsonNode) data1;
- JsonNode json2 = (JsonNode) data2;
-
- String string1 = convertJsonToString(json1);
- String string2 = convertJsonToString(json2);
-
- return string1.compareTo(string2);
- }
-
public ValueMetaJson() {
this(null);
}
@@ -56,12 +54,7 @@ public class ValueMetaJson extends ValueMetaBase {
@Override
public String convertJsonToString(JsonNode jsonNode) throws
HopValueException {
try {
- ObjectMapper objectMapper = HopJson.newMapper();
- if (prettyPrinting) {
- return
objectMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode);
- } else {
- return objectMapper.writeValueAsString(jsonNode);
- }
+ return JsonUtil.mapJsonToString(jsonNode, prettyPrinting);
} catch (Exception e) {
throw new HopValueException("Error converting JSON value to String", e);
}
@@ -82,6 +75,231 @@ public class ValueMetaJson extends ValueMetaBase {
}
}
+ @Override
+ // used mainly to handle json conversion from Mongo
+ public JsonNode getJson(Object object) throws HopValueException {
+ if (object == null) {
+ return null;
+ }
+
+ try {
+ if (type == TYPE_JSON) {
+ switch (storageType) {
+ case STORAGE_TYPE_INDEXED:
+ // Resolve the dictionary/indexed value, then fall through to
NORMAL handling
+ object = index[((Integer) object)];
+ case STORAGE_TYPE_NORMAL:
+ if (object instanceof JsonNode node) {
+ return node;
+ }
+ // Convert Mongo structures to JsonNode
+ return JsonUtil.mapObjectToJson(object);
+ case STORAGE_TYPE_BINARY_STRING:
+ // Parse UTF-8 JSON bytes directly (faster than building a String
first)
+ return JsonUtil.parse((byte[]) object);
+ }
+ }
+ } catch (IOException e) {
+ throw new HopValueException("Error converting bytes to JSON (" + object
+ ")", e);
+ }
+ // delegate
+ return super.getJson(object);
+ }
+
+ /**
+ * Returns the immediate field names of a node, sorted alphabetically.
+ *
+ * <p>- Only the top-level keys of the given node are considered. If a field
contains another
+ * object, that child’s keys are NOT included in the result. - Keys are
sorted in ascending
+ * lexicographic order because it is the simplest, deterministic, and
generally fastest policy for
+ * this use case.
+ */
+ private String[] getSortedKeys(JsonNode node) {
+ final int n = node.size();
+ String[] sortedKeys = new String[n];
+ int c = 0;
+
+ for (var it = node.fieldNames(); it.hasNext(); ) {
+ String key = it.next();
+
+ // insertion sort
+ int i = c - 1;
+ while (i >= 0 && sortedKeys[i].compareTo(key) > 0) {
+ sortedKeys[i + 1] = sortedKeys[i];
+ i--;
+ }
+ sortedKeys[i + 1] = key;
+ c++;
+ }
+
+ return sortedKeys;
+ }
+
+ /**
+ * Classifies a {@link JsonNode} into an ordering “kind” used by the
comparator. The resulting
+ * integer defines the global type precedence for comparisons. Policy
follows the one of
+ * PostgreSQL's JSONB with the addition of the binary type.
+ */
+ private static int kind(JsonNode n) throws HopValueException {
+ if (n == null) return 0;
+ return switch (n.getNodeType()) {
+ case NULL -> 0;
+ case MISSING -> 1;
+ case BINARY -> 2;
+ case STRING -> 3;
+ case NUMBER -> 4;
+ case BOOLEAN -> 5;
+ case ARRAY -> 6;
+ case OBJECT -> 7;
+ default -> throw new HopValueException("Unsupported JsonNode type: " +
n.getNodeType());
+ };
+ }
+
+ /**
+ * Compares two JsonNode values. Type precedence is enforced via {@link
#kind(JsonNode)}. Nodes of
+ * different kinds are ordered by that precedence. Nodes of the same kind
are ordered based on
+ * their kind.
+ *
+ * <p>Note, object key order is irrelevant since sorted keys are used.
+ */
+ protected int jsonCompare(JsonNode a, JsonNode b) throws HopValueException {
+ // First, compare by kind
+ int kindA = kind(a);
+ int kindB = kind(b);
+ if (kindA != kindB) {
+ return Integer.compare(kindA, kindB);
+ }
+
+ return switch (a.getNodeType()) {
+ case NULL, MISSING -> 0;
+ case BINARY -> jsonCompareBinary(a, b);
+ case STRING -> a.textValue().compareTo(b.textValue());
+ case NUMBER -> a.decimalValue().compareTo(b.decimalValue());
+ case BOOLEAN -> Boolean.compare(a.booleanValue(), b.booleanValue());
+ case ARRAY -> jsonCompareArray(a, b);
+ case OBJECT -> jsonCompareObject(a, b);
+ default -> throw new HopValueException("Unsupported JsonNode type: " +
a.getNodeType());
+ };
+ }
+
+ private int jsonCompareBinary(JsonNode a, JsonNode b) throws
HopValueException {
+ byte[] binaryA, binaryB;
+ try {
+ binaryA = a.binaryValue();
+ binaryB = b.binaryValue();
+ } catch (Exception e) {
+ // If a node can't expose binary content, treat as empty
+ binaryA = new byte[0];
+ binaryB = new byte[0];
+ }
+
+ // Unsigned byte comparison
+ final int min = Math.min(binaryA.length, binaryB.length);
+ for (int i = 0; i < min; i++) {
+ int diff = (binaryA[i] & 0xFF) - (binaryB[i] & 0xFF);
+ if (diff != 0) return diff;
+ }
+ return binaryA.length - binaryB.length;
+ }
+
+ private int jsonCompareArray(JsonNode a, JsonNode b) throws
HopValueException {
+ // Compare by length, then element-wise
+ int lengthA = a.size();
+ int lengthB = b.size();
+
+ // length comparison
+ if (lengthA != lengthB) return Integer.compare(lengthA, lengthB);
+
+ // element-wise comparison
+ for (int i = 0; i < lengthA; i++) {
+ int c = jsonCompare(a.get(i), b.get(i));
+ if (c != 0) return c;
+ }
+ return 0;
+ }
+
+ private int jsonCompareObject(JsonNode a, JsonNode b) throws
HopValueException {
+ // get alphabetically sorted key lists
+ String[] keysA = getSortedKeys(a);
+ String[] keysB = getSortedKeys(b);
+
+ // object with (n) keys > object with (n - 1) keys
+ if (keysA.length != keysB.length) return Integer.compare(keysA.length,
keysB.length);
+
+ for (int i = 0; i < keysA.length; i++) {
+ // compare the keys
+ int keyCompare = keysA[i].compareTo(keysB[i]);
+ if (keyCompare != 0) return keyCompare;
+
+ // compare values by the same keys
+ int valueCompare = jsonCompare(a.get(keysA[i]), b.get(keysB[i]));
+ if (valueCompare != 0) return valueCompare;
+ }
+
+ return 0;
+ }
+
+ @Override
+ protected int typeCompare(Object object1, Object object2) throws
HopValueException {
+ JsonNode a = getJson(object1);
+ JsonNode b = getJson(object2);
+ return jsonCompare(a, b);
+ }
+
+ @Override
+ public void setPreparedStatementValue(
+ DatabaseMeta databaseMeta, PreparedStatement preparedStatement, int
index, Object data)
+ throws HopDatabaseException {
+ try {
+ JsonNode jn = getJson(data);
+ if (jn == null) {
+ preparedStatement.setNull(index, Types.OTHER);
+ return;
+ }
+
+ // This handles both JSON and JSONB if Postgres.
+ // other dbs don't accept type OTHER
+ if (databaseMeta.getIDatabase().isPostgresVariant()) {
+ preparedStatement.setObject(index, jn, Types.OTHER);
+ return;
+ }
+
+ // generic fallback to String
+ preparedStatement.setString(index, this.convertJsonToString(jn));
+ } catch (Exception e) {
+ throw new HopDatabaseException(
+ "Error setting JSON value #" + index + " [" + toStringMeta() + "] on
prepared statement",
+ e);
+ }
+ }
+
+ @Override
+ public Object getValueFromResultSet(IDatabase iDatabase, ResultSet
resultSet, int index)
+ throws HopDatabaseException {
+
+ // Try to convert the binary stream directly to JsonNode
+ try (InputStream in = resultSet.getBinaryStream(index + 1)) {
+ return JsonUtil.parse(in);
+
+ } catch (SQLException | IOException e) {
+ // fallback; read object, get string representation, convert to JsonNode
+ try {
+ Object o = resultSet.getObject(index + 1);
+ return JsonUtil.parseTextValue(o);
+
+ } catch (SQLException ex) {
+ throw new HopDatabaseException(
+ "Unable to get JSON value '"
+ + toStringMeta()
+ + "' from database resultset, index "
+ + index,
+ e);
+ } catch (Exception exp) {
+ throw new HopDatabaseException("Unable to read JSON value", exp);
+ }
+ }
+ }
+
@Override
public Object getNativeDataType(Object object) throws HopValueException {
return getJson(object);
@@ -107,4 +325,22 @@ public class ValueMetaJson extends ValueMetaBase {
public void setPrettyPrinting(boolean prettyPrinting) {
this.prettyPrinting = prettyPrinting;
}
+
+ @Override
+ public String getDatabaseColumnTypeDefinition(
+ IDatabase iDatabase,
+ String tk,
+ String pk,
+ boolean useAutoIncrement,
+ boolean addFieldName,
+ boolean addCr) {
+ final String col = addFieldName ? getName() + " " : "";
+ String def = "JSON";
+
+ // Postgres advices non-legacy app to use JSONB instead of JSON
+ if (iDatabase.isPostgresVariant()) {
+ def = "JSONB";
+ }
+ return col + def + (addCr ? Const.CR : "");
+ }
}
diff --git
a/core/src/main/java/org/apache/hop/core/row/value/ValueMetaString.java
b/core/src/main/java/org/apache/hop/core/row/value/ValueMetaString.java
index 57bb84132b..5340a34e65 100644
--- a/core/src/main/java/org/apache/hop/core/row/value/ValueMetaString.java
+++ b/core/src/main/java/org/apache/hop/core/row/value/ValueMetaString.java
@@ -17,9 +17,11 @@
package org.apache.hop.core.row.value;
+import com.fasterxml.jackson.databind.JsonNode;
import java.util.Comparator;
import org.apache.hop.core.exception.HopValueException;
import org.apache.hop.core.row.IValueMeta;
+import org.apache.hop.core.util.JsonUtil;
@ValueMetaPlugin(id = "2", name = "String", description = "String", image =
"images/string.svg")
public class ValueMetaString extends ValueMetaBase {
@@ -40,6 +42,33 @@ public class ValueMetaString extends ValueMetaBase {
super(name, IValueMeta.TYPE_STRING, length, precision);
}
+ @Override
+ // For backward compatibility Hop often reads JSON as String, this optimizes
String/byte[] into
+ // JsonNode
+ public JsonNode getJson(Object object) throws HopValueException {
+ if (object == null) {
+ return null;
+ }
+
+ try {
+ if (type == TYPE_STRING) {
+ switch (storageType) {
+ case STORAGE_TYPE_INDEXED:
+ // Resolve the value, then fall through to NORMAL handling
+ object = index[(Integer) object];
+ case STORAGE_TYPE_NORMAL:
+ // JsonUtil handles both NORMAL and BINARY_STRING cases
+ case STORAGE_TYPE_BINARY_STRING:
+ return JsonUtil.parseTextValue(object);
+ }
+ }
+ } catch (Exception e) {
+ throw new HopValueException("Error converting value to JSON (" + object
+ ")", e);
+ }
+ // delegate
+ return super.getJson(object);
+ }
+
@Override
public Object getNativeDataType(Object object) throws HopValueException {
return getString(object);
diff --git a/core/src/main/java/org/apache/hop/core/util/JsonUtil.java
b/core/src/main/java/org/apache/hop/core/util/JsonUtil.java
new file mode 100644
index 0000000000..5a9befc1e4
--- /dev/null
+++ b/core/src/main/java/org/apache/hop/core/util/JsonUtil.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.core.util;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.ObjectReader;
+import java.io.IOException;
+import java.io.InputStream;
+
+public class JsonUtil {
+
+ private static final ObjectReader jsonReader =
+ com.fasterxml.jackson.databind.json.JsonMapper.builder()
+
.enable(com.fasterxml.jackson.core.json.JsonReadFeature.ALLOW_UNQUOTED_FIELD_NAMES)
+ .build()
+ .reader();
+
+ /** singleton to parse Object into JsonNode */
+ private static final ObjectMapper jsonMapper = new ObjectMapper();
+
+ private JsonUtil() {}
+
+ public static ObjectReader jsonReader() {
+ return jsonReader;
+ }
+
+ public static ObjectMapper jsonMapper() {
+ return jsonMapper;
+ }
+
+ public static JsonNode parse(byte[] bytes) throws IOException {
+ if (bytes == null) {
+ return null;
+ }
+ return jsonReader.readTree(bytes);
+ }
+
+ public static JsonNode parse(CharSequence text) throws
JsonProcessingException {
+ if (text == null) {
+ return null;
+ }
+ return jsonReader.readTree(text.toString());
+ }
+
+ public static JsonNode parse(InputStream in) throws IOException {
+ if (in == null) {
+ return null;
+ }
+ return jsonReader.readTree(in);
+ }
+
+ public static JsonNode parseTextValue(Object value) throws Exception {
+ if (value == null) return null;
+ if (value instanceof byte[] b) return parse(b);
+ if (value instanceof CharSequence cs) return parse(cs);
+ if (value instanceof InputStream is) return parse(is);
+
+ return parse(value.toString());
+ }
+
+ public static JsonNode mapObjectToJson(Object object) {
+ if (object == null) {
+ return null;
+ }
+
+ if (object instanceof JsonNode node) {
+ return node;
+ }
+
+ return jsonMapper.valueToTree(object);
+ }
+
+ public static String mapJsonToString(JsonNode jsonNode, boolean
prettyPrinting)
+ throws JsonProcessingException {
+ if (jsonNode == null) {
+ return null;
+ }
+ if (prettyPrinting) {
+ return
jsonMapper.writerWithDefaultPrettyPrinter().writeValueAsString(jsonNode);
+ }
+ return jsonMapper.writeValueAsString(jsonNode);
+ }
+
+ public static byte[] mapJsonToBytes(JsonNode jsonNode) throws
JsonProcessingException {
+ if (jsonNode == null) {
+ return null;
+ }
+ // encoded in UTF8
+ return jsonMapper.writeValueAsBytes(jsonNode);
+ }
+}
diff --git
a/core/src/test/java/org/apache/hop/core/row/value/ValueMetaJsonTest.java
b/core/src/test/java/org/apache/hop/core/row/value/ValueMetaJsonTest.java
new file mode 100644
index 0000000000..e25e5c8963
--- /dev/null
+++ b/core/src/test/java/org/apache/hop/core/row/value/ValueMetaJsonTest.java
@@ -0,0 +1,228 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.core.row.value;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.node.BinaryNode;
+import com.fasterxml.jackson.databind.node.BooleanNode;
+import com.fasterxml.jackson.databind.node.DecimalNode;
+import com.fasterxml.jackson.databind.node.IntNode;
+import com.fasterxml.jackson.databind.node.MissingNode;
+import com.fasterxml.jackson.databind.node.NullNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import com.fasterxml.jackson.databind.node.TextNode;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.math.BigDecimal;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import org.apache.hop.core.Const;
+import org.apache.hop.core.database.IDatabase;
+import org.apache.hop.core.row.IValueMeta;
+import org.junit.Test;
+
+public class ValueMetaJsonTest {
+
+ private static final ObjectMapper mapper = new ObjectMapper();
+
+ private static JsonNode obj(String json) throws Exception {
+ return mapper.readTree(json);
+ }
+
+ // -------------------------
+ // Conversions
+ // -------------------------
+
+ @Test
+ public void testConvertPrettyPrinting() throws Exception {
+ ValueMetaJson vm = new ValueMetaJson("j");
+ ObjectNode o = mapper.createObjectNode().put("a", 1).put("b", "x");
+
+ vm.setPrettyPrinting(false);
+ String compact = vm.getString(o);
+ assertEquals("{\"a\":1,\"b\":\"x\"}", compact);
+
+ vm.setPrettyPrinting(true);
+ String pretty = vm.getString(o);
+ assertTrue(pretty.contains("\n"));
+ assertTrue(pretty.contains(" \"a\""));
+ }
+
+ @Test
+ public void testCloneValueData() throws Exception {
+ ValueMetaJson vm = new ValueMetaJson("j");
+ ObjectNode o = mapper.createObjectNode().put("a", 1);
+ JsonNode cloned = (JsonNode) vm.cloneValueData(o);
+ assertNotSame(o, cloned);
+ assertEquals(o, cloned);
+ }
+
+ @Test
+ public void testGetJsonAcrossStorageTypes() throws Exception {
+ // NORMAL with JsonNode
+ ValueMetaJson normal = new ValueMetaJson("j");
+ normal.setStorageType(IValueMeta.STORAGE_TYPE_NORMAL);
+ ObjectNode o = mapper.createObjectNode().put("k", "v");
+ assertSame(o, normal.getJson(o));
+
+ // NORMAL with Map
+ var mapped = normal.getJson(java.util.Map.of("a", 1));
+ assertEquals(obj("{\"a\":1}"), mapped);
+
+ // BINARY_STRING
+ ValueMetaJson bin = new ValueMetaJson("j");
+ bin.setStorageType(IValueMeta.STORAGE_TYPE_BINARY_STRING);
+ byte[] bytes =
"{\"x\":true}".getBytes(java.nio.charset.StandardCharsets.UTF_8);
+ assertEquals(obj("{\"x\":true}"), bin.getJson(bytes));
+
+ // INDEXED
+ ValueMetaJson idx = new ValueMetaJson("j");
+ idx.setStorageType(IValueMeta.STORAGE_TYPE_INDEXED);
+ JsonNode v0 = obj("{\"i\":0}");
+ JsonNode v1 = obj("{\"i\":1}");
+ idx.setIndex(new Object[] {v0, v1});
+ assertEquals(v1, idx.getJson(1));
+ }
+
+ // -------------------------
+ // Comparison
+ // -------------------------
+
+ @Test
+ public void testJsonCompare() throws Exception {
+ ValueMetaJson vm = new ValueMetaJson("j");
+
+ // Kind precedence
+ JsonNode NULL = NullNode.getInstance();
+ JsonNode MISSING = MissingNode.getInstance();
+ JsonNode BIN = BinaryNode.valueOf(new byte[] {0x01});
+ JsonNode STR = TextNode.valueOf("a");
+ JsonNode NUM = DecimalNode.valueOf(new BigDecimal("2.5"));
+ JsonNode BOOL = BooleanNode.TRUE;
+ JsonNode ARY = mapper.createArrayNode().add(1).add(2);
+ JsonNode OBJ = mapper.createObjectNode().put("a", 1);
+
+ assertTrue(vm.typeCompare(NULL, MISSING) < 0);
+ assertTrue(vm.typeCompare(MISSING, BIN) < 0);
+ assertTrue(vm.typeCompare(BIN, STR) < 0);
+ assertTrue(vm.typeCompare(STR, NUM) < 0);
+ assertTrue(vm.typeCompare(NUM, BOOL) < 0);
+ assertTrue(vm.typeCompare(BOOL, ARY) < 0);
+ assertTrue(vm.typeCompare(ARY, OBJ) < 0);
+
+ // Same-kind
+ assertTrue(vm.typeCompare(IntNode.valueOf(10), IntNode.valueOf(11)) < 0);
+
+ // Arrays
+ JsonNode a1 = obj("[1,2]");
+ JsonNode a2 = obj("[1,3]");
+ assertTrue(vm.typeCompare(a1, a2) < 0);
+
+ // Objects
+ JsonNode o1 = obj("{\"a\":1,\"b\":2}");
+ JsonNode o2 = obj("{\"b\":2,\"a\":1}");
+ assertEquals(0, vm.typeCompare(o1, o2));
+ JsonNode o3 = obj("{\"a\":1,\"b\":3}");
+ assertTrue(vm.typeCompare(o1, o3) < 0);
+ }
+
+ @Test
+ public void testBinaryCompare() throws Exception {
+ ValueMetaJson vm = new ValueMetaJson("j");
+ JsonNode a = BinaryNode.valueOf(new byte[] {(byte) 0xFF}); // 255
+ JsonNode b = BinaryNode.valueOf(new byte[] {0x00}); // 0
+ assertTrue(vm.typeCompare(a, b) > 0);
+
+ JsonNode c = BinaryNode.valueOf(new byte[] {0x01, 0x02}); // longer one
+ JsonNode d = BinaryNode.valueOf(new byte[] {0x01});
+ assertTrue(vm.typeCompare(c, d) > 0);
+ }
+
+ // -------------------------
+ // Read from ResultSet
+ // -------------------------
+
+ @Test
+ public void testGetValueFromResultSet_BinaryStream() throws Exception {
+ ValueMetaJson vm = new ValueMetaJson("j");
+ IDatabase idb = mock(IDatabase.class);
+ ResultSet rs = mock(ResultSet.class);
+
+ String json = "{\"x\":42}";
+ InputStream in =
+ new
ByteArrayInputStream(json.getBytes(java.nio.charset.StandardCharsets.UTF_8));
+ when(rs.getBinaryStream(1)).thenReturn(in); // index+1 = 1
+
+ Object out = vm.getValueFromResultSet(idb, rs, 0);
+ assertTrue(out instanceof JsonNode);
+ assertEquals(obj(json), out);
+ }
+
+ @Test
+ public void testGetValueFromResultSet_FallbackObject() throws Exception {
+ ValueMetaJson vm = new ValueMetaJson("j");
+ IDatabase idb = mock(IDatabase.class);
+ ResultSet rs = mock(ResultSet.class);
+
+ when(rs.getBinaryStream(1)).thenThrow(new SQLException("no binary"));
+ when(rs.getObject(1)).thenReturn("{\"k\":\"v\"}");
+
+ Object out = vm.getValueFromResultSet(idb, rs, 0);
+ assertEquals(obj("{\"k\":\"v\"}"), out);
+ }
+
+ // -------------------------
+ // Native type
+ // -------------------------
+
+ @Test
+ public void testNativeTypeInfo() throws Exception {
+ ValueMetaJson vm = new ValueMetaJson("j");
+ JsonNode node = obj("{\"a\":1}");
+ assertSame(node, vm.getNativeDataType(node));
+ assertEquals(JsonNode.class, vm.getNativeDataTypeClass());
+ }
+
+ @Test
+ public void testDatabaseColumnTypeDefinition() {
+ ValueMetaJson vm = new ValueMetaJson("col");
+ IDatabase pg = mock(IDatabase.class);
+ when(pg.isPostgresVariant()).thenReturn(true);
+ IDatabase other = mock(IDatabase.class);
+ when(other.isPostgresVariant()).thenReturn(false);
+
+ assertEquals("JSONB", vm.getDatabaseColumnTypeDefinition(pg, null, null,
false, false, false));
+ assertEquals(
+ "col JSONB" + Const.CR,
+ vm.getDatabaseColumnTypeDefinition(pg, null, null, false, true, true));
+
+ assertEquals(
+ "JSON", vm.getDatabaseColumnTypeDefinition(other, null, null, false,
false, false));
+ assertEquals(
+ "col JSON" + Const.CR,
+ vm.getDatabaseColumnTypeDefinition(other, null, null, false, true,
true));
+ }
+}
diff --git a/core/src/test/java/org/apache/hop/core/util/JsonUtilTest.java
b/core/src/test/java/org/apache/hop/core/util/JsonUtilTest.java
new file mode 100644
index 0000000000..19cd4f2ec4
--- /dev/null
+++ b/core/src/test/java/org/apache/hop/core/util/JsonUtilTest.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hop.core.util;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThrows;
+import static org.junit.Assert.assertTrue;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.util.List;
+import java.util.Map;
+import org.junit.Test;
+
+public class JsonUtilTest {
+
+ // -------------------------
+ // Singletons
+ // -------------------------
+
+ @Test
+ public void testSingletons() {
+ assertNotNull(JsonUtil.jsonReader());
+ assertNotNull(JsonUtil.jsonMapper());
+ // same instance each call
+ assertSame(JsonUtil.jsonReader(), JsonUtil.jsonReader());
+ assertSame(JsonUtil.jsonMapper(), JsonUtil.jsonMapper());
+ }
+
+ // -------------------------
+ // Parse
+ // -------------------------
+
+ @Test
+ public void testParseCharSequence() throws Exception {
+ JsonNode n = JsonUtil.parse("{a:1, b:\"x\"}");
+ assertEquals(1, n.get("a").intValue());
+ assertEquals("x", n.get("b").textValue());
+ }
+
+ @Test
+ public void testParseBytes() throws Exception {
+ byte[] bytes = "{\"k\":true}".getBytes(StandardCharsets.UTF_8);
+ JsonNode n = JsonUtil.parse(bytes);
+ assertTrue(n.get("k").booleanValue());
+ }
+
+ @Test
+ public void testParseStream() throws Exception {
+ InputStream in = new
ByteArrayInputStream("{\"n\":42}".getBytes(StandardCharsets.UTF_8));
+ JsonNode n = JsonUtil.parse(in);
+ assertEquals(42, n.get("n").intValue());
+ }
+
+ @Test
+ public void testParseNulls() throws Exception {
+ assertNull(JsonUtil.parse((CharSequence) null));
+ assertNull(JsonUtil.parse((byte[]) null));
+ assertNull(JsonUtil.parse((InputStream) null));
+ }
+
+ @Test
+ public void testParseInvalid() {
+ assertThrows(JsonProcessingException.class, () -> JsonUtil.parse("{oops"));
+ }
+
+ @Test
+ public void testParseTextValue() throws Exception {
+ var bytes = "{\"a\":1}".getBytes(StandardCharsets.UTF_8);
+ assertEquals(1, JsonUtil.parseTextValue(bytes).get("a").intValue());
+
+ assertEquals(2, JsonUtil.parseTextValue("{\"b\":2}").get("b").intValue());
+
+ InputStream in = new
ByteArrayInputStream("{\"c\":3}".getBytes(StandardCharsets.UTF_8));
+ assertEquals(3, JsonUtil.parseTextValue(in).get("c").intValue());
+
+ // generic object
+ Object custom =
+ new Object() {
+ @Override
+ public String toString() {
+ return "{\"d\":4}";
+ }
+ };
+ assertEquals(4, JsonUtil.parseTextValue(custom).get("d").intValue());
+
+ assertNull(JsonUtil.parseTextValue(null));
+ }
+
+ // -------------------------
+ // Mapping
+ // -------------------------
+
+ @Test
+ public void testMapObjectToJson() {
+ assertNull(JsonUtil.mapObjectToJson(null));
+
+ ObjectNode node = JsonUtil.jsonMapper().createObjectNode().put("x", 1);
+ assertSame(node, JsonUtil.mapObjectToJson(node));
+
+ JsonNode mappedMap = JsonUtil.mapObjectToJson(Map.of("a", 1, "b", "x"));
+ assertEquals(1, mappedMap.get("a").intValue());
+ assertEquals("x", mappedMap.get("b").textValue());
+
+ JsonNode mappedList = JsonUtil.mapObjectToJson(List.of(1, 2, 3));
+ assertEquals(3, mappedList.size());
+ assertEquals(2, mappedList.get(1).intValue());
+ }
+
+ @Test
+ public void testMapJsonToString() throws Exception {
+ ObjectNode o = JsonUtil.jsonMapper().createObjectNode().put("a",
1).put("b", "x");
+
+ String compact = JsonUtil.mapJsonToString(o, false);
+ assertEquals("{\"a\":1,\"b\":\"x\"}", compact);
+
+ String pretty = JsonUtil.mapJsonToString(o, true);
+ assertTrue(pretty.contains("\n"));
+ assertTrue(pretty.contains(" \"a\""));
+
+ assertNull(JsonUtil.mapJsonToString(null, true));
+ }
+
+ @Test
+ public void testMapJsonToBytes() throws Exception {
+ ObjectNode o = JsonUtil.jsonMapper().createObjectNode().put("a", "u");
+ byte[] bytes = JsonUtil.mapJsonToBytes(o);
+ assertNotNull(bytes);
+
+ JsonNode n = JsonUtil.parse(new ByteArrayInputStream(bytes));
+ assertEquals("u", n.get("a").textValue());
+
+ assertNull(JsonUtil.mapJsonToBytes(null));
+ }
+}
diff --git a/integration-tests/database/0033-postgres-json-insert-json.hpl
b/integration-tests/database/0033-postgres-json-insert-json.hpl
new file mode 100644
index 0000000000..1d6e95541b
--- /dev/null
+++ b/integration-tests/database/0033-postgres-json-insert-json.hpl
@@ -0,0 +1,114 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+ <info>
+ <name>0033-postgres-json-insert-json</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <pipeline_version/>
+ <pipeline_type>Normal</pipeline_type>
+ <parameters>
+ </parameters>
+ <capture_transform_performance>N</capture_transform_performance>
+
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 16:24:30.286</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 16:24:30.286</modified_date>
+ </info>
+ <notepads>
+ </notepads>
+ <order>
+ <hop>
+ <from>JSONB input</from>
+ <to>JSON output</to>
+ <enabled>Y</enabled>
+ </hop>
+ </order>
+ <transform>
+ <name>JSONB input</name>
+ <type>TableInput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <connection>unit-test-db</connection>
+ <execute_each_row>N</execute_each_row>
+ <limit>0</limit>
+ <sql>SELECT z_fighter
+FROM jsonb_test;</sql>
+ <variables_active>N</variables_active>
+ <attributes/>
+ <GUI>
+ <xloc>208</xloc>
+ <yloc>128</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>JSON output</name>
+ <type>TableOutput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <commit>1000</commit>
+ <connection>unit-test-db</connection>
+ <fields>
+ <field>
+ <column_name>z_fighter</column_name>
+ <stream_name>z_fighter</stream_name>
+ </field>
+ </fields>
+ <ignore_errors>N</ignore_errors>
+ <only_when_have_rows>N</only_when_have_rows>
+ <partitioning_daily>N</partitioning_daily>
+ <partitioning_enabled>N</partitioning_enabled>
+ <partitioning_field/>
+ <partitioning_monthly>Y</partitioning_monthly>
+ <return_field/>
+ <return_keys>N</return_keys>
+ <schema/>
+ <specify_fields>Y</specify_fields>
+ <table>json_test</table>
+ <tablename_field/>
+ <tablename_in_field>N</tablename_in_field>
+ <tablename_in_table>Y</tablename_in_table>
+ <truncate>N</truncate>
+ <use_batch>Y</use_batch>
+ <attributes/>
+ <GUI>
+ <xloc>400</xloc>
+ <yloc>128</yloc>
+ </GUI>
+ </transform>
+ <transform_error_handling>
+ </transform_error_handling>
+ <attributes/>
+</pipeline>
diff --git a/integration-tests/database/0033-postgres-json-validate-json.hpl
b/integration-tests/database/0033-postgres-json-validate-json.hpl
new file mode 100644
index 0000000000..582fb92217
--- /dev/null
+++ b/integration-tests/database/0033-postgres-json-validate-json.hpl
@@ -0,0 +1,275 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+ <info>
+ <name>0033-postgres-json-validate-json</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <pipeline_version/>
+ <pipeline_type>Normal</pipeline_type>
+ <parameters>
+ </parameters>
+ <capture_transform_performance>N</capture_transform_performance>
+
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 16:27:21.481</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 16:27:21.481</modified_date>
+ </info>
+ <notepads>
+ </notepads>
+ <order>
+ <hop>
+ <from>JSONB input</from>
+ <to>Sort JSONB</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>JSON input</from>
+ <to>Sort JSON</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>Sort JSONB</from>
+ <to>Merge rows (diff)</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>Sort JSON</from>
+ <to>Merge rows (diff)</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>Merge rows (diff)</from>
+ <to>IS identical?</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>IS identical?</from>
+ <to>Abort</to>
+ <enabled>Y</enabled>
+ </hop>
+ </order>
+ <transform>
+ <name>JSONB input</name>
+ <type>TableInput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <connection>unit-test-db</connection>
+ <execute_each_row>N</execute_each_row>
+ <limit>0</limit>
+ <sql>SELECT z_fighter
+FROM jsonb_test
+ORDER BY z_fighter->'name' DESC;</sql>
+ <variables_active>N</variables_active>
+ <attributes/>
+ <GUI>
+ <xloc>224</xloc>
+ <yloc>112</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>JSON input</name>
+ <type>TableInput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <connection>unit-test-db</connection>
+ <execute_each_row>N</execute_each_row>
+ <limit>0</limit>
+ <sql>SELECT z_fighter
+FROM json_test;</sql>
+ <variables_active>N</variables_active>
+ <attributes/>
+ <GUI>
+ <xloc>224</xloc>
+ <yloc>224</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Sort JSONB</name>
+ <type>SortRows</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <compress>N</compress>
+ <compress_variables/>
+ <directory>${java.io.tmpdir}</directory>
+ <fields>
+ <field>
+ <ascending>Y</ascending>
+ <case_sensitive>N</case_sensitive>
+ <collator_enabled>N</collator_enabled>
+ <collator_strength>0</collator_strength>
+ <name>z_fighter</name>
+ <presorted>N</presorted>
+ </field>
+ </fields>
+ <free_memory/>
+ <sort_prefix>out</sort_prefix>
+ <sort_size>1000000</sort_size>
+ <unique_rows>N</unique_rows>
+ <attributes/>
+ <GUI>
+ <xloc>368</xloc>
+ <yloc>112</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Sort JSON</name>
+ <type>SortRows</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <compress>N</compress>
+ <compress_variables/>
+ <directory>${java.io.tmpdir}</directory>
+ <fields>
+ <field>
+ <ascending>Y</ascending>
+ <case_sensitive>N</case_sensitive>
+ <collator_enabled>N</collator_enabled>
+ <collator_strength>0</collator_strength>
+ <name>z_fighter</name>
+ <presorted>N</presorted>
+ </field>
+ </fields>
+ <free_memory/>
+ <sort_prefix>out</sort_prefix>
+ <sort_size>1000000</sort_size>
+ <unique_rows>N</unique_rows>
+ <attributes/>
+ <GUI>
+ <xloc>368</xloc>
+ <yloc>224</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Merge rows (diff)</name>
+ <type>MergeRows</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <compare>Sort JSON</compare>
+ <diff-field/>
+ <flag_field>flagfield</flag_field>
+ <keys>
+ <key>z_fighter</key>
+ </keys>
+ <reference>Sort JSONB</reference>
+ <values>
+ <value>z_fighter</value>
+ </values>
+ <attributes/>
+ <GUI>
+ <xloc>512</xloc>
+ <yloc>176</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>IS identical?</name>
+ <type>FilterRows</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <compare>
+ <condition>
+ <conditions>
+</conditions>
+ <function>=</function>
+ <leftvalue>flagfield</leftvalue>
+ <negated>N</negated>
+ <operator>-</operator>
+ <value>
+ <isnull>N</isnull>
+ <length>-1</length>
+ <mask/>
+ <name>constant</name>
+ <precision>-1</precision>
+ <text>identical</text>
+ <type>String</type>
+ </value>
+ </condition>
+ </compare>
+ <send_false_to>Abort</send_false_to>
+ <attributes/>
+ <GUI>
+ <xloc>656</xloc>
+ <yloc>176</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Abort</name>
+ <type>Abort</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <abort_option>ABORT_WITH_ERROR</abort_option>
+ <always_log_rows>Y</always_log_rows>
+ <message/>
+ <row_threshold>0</row_threshold>
+ <attributes/>
+ <GUI>
+ <xloc>800</xloc>
+ <yloc>176</yloc>
+ </GUI>
+ </transform>
+ <transform_error_handling>
+ </transform_error_handling>
+ <attributes/>
+</pipeline>
diff --git a/integration-tests/database/main-0033-postgres-json.hwf
b/integration-tests/database/main-0033-postgres-json.hwf
new file mode 100644
index 0000000000..3d7d328be4
--- /dev/null
+++ b/integration-tests/database/main-0033-postgres-json.hwf
@@ -0,0 +1,238 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<workflow>
+ <name>main-0033-postgres-json</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <workflow_version/>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 15:24:30.737</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 15:24:30.737</modified_date>
+ <parameters>
+ </parameters>
+ <actions>
+ <action>
+ <name>Start</name>
+ <description/>
+ <type>SPECIAL</type>
+ <attributes/>
+ <DayOfMonth>1</DayOfMonth>
+ <doNotWaitOnFirstExecution>N</doNotWaitOnFirstExecution>
+ <hour>12</hour>
+ <intervalMinutes>60</intervalMinutes>
+ <intervalSeconds>0</intervalSeconds>
+ <minutes>0</minutes>
+ <repeat>N</repeat>
+ <schedulerType>0</schedulerType>
+ <weekDay>1</weekDay>
+ <parallel>N</parallel>
+ <xloc>50</xloc>
+ <yloc>50</yloc>
+ <attributes_hac/>
+ </action>
+ <action>
+ <name>Init table</name>
+ <description/>
+ <type>SQL</type>
+ <attributes/>
+ <connection>unit-test-db</connection>
+ <sendOneStatement>N</sendOneStatement>
+ <sql>DROP TABLE IF EXISTS jsonb_test;
+
+CREATE TABLE jsonb_test
+(
+ z_fighter jsonb
+);
+
+INSERT INTO jsonb_test (z_fighter)
+VALUES ('{
+ "name": "Goku",
+ "height": 175,
+ "moves": [
+ {
+ "moveName": "Kamehameha",
+ "power": 9500
+ },
+ {
+ "moveName": "Spirit Bomb",
+ "power": 12000
+ }
+ ]
+}'::jsonb),
+ ('{
+ "name": "Vegeta",
+ "height": 164,
+ "moves": [
+ {
+ "moveName": "Galick Gun",
+ "power": 9000
+ },
+ {
+ "moveName": "Final Flash",
+ "power": 11500
+ }
+ ]
+ }'::jsonb),
+ ('{
+ "name": "Gohan",
+ "height": 176,
+ "moves": [
+ {
+ "moveName": "Masenko",
+ "power": 8500
+ },
+ {
+ "moveName": "Father-Son Kamehameha",
+ "power": 11000
+ }
+ ]
+ }'::jsonb),
+ ('{
+ "name": "Piccolo",
+ "height": 226,
+ "moves": [
+ {
+ "moveName": "Special Beam Cannon",
+ "power": 10000
+ },
+ {
+ "moveName": "Hellzone Grenade",
+ "power": 9500
+ }
+ ]
+ }'::jsonb),
+ ('{
+ "name": "Krillin",
+ "height": 153,
+ "moves": [
+ {
+ "moveName": "Destructo Disc",
+ "power": 8000
+ },
+ {
+ "moveName": "Solar Flare",
+ "power": 2000
+ }
+ ]
+ }'::jsonb);
+
+DROP TABLE IF EXISTS json_test;
+
+CREATE TABLE json_test
+(
+ z_fighter json
+);
+
+DROP TABLE IF EXISTS jsonb_moves;
+
+CREATE TABLE jsonb_moves
+(
+ move jsonb
+);
+</sql>
+ <sqlfromfile>N</sqlfromfile>
+ <useVariableSubstitution>N</useVariableSubstitution>
+ <parallel>N</parallel>
+ <xloc>192</xloc>
+ <yloc>48</yloc>
+ <attributes_hac/>
+ </action>
+ <action>
+ <name>0033-postgres-json-insert-json.hpl</name>
+ <description/>
+ <type>PIPELINE</type>
+ <attributes/>
+ <add_date>N</add_date>
+ <add_time>N</add_time>
+ <clear_files>N</clear_files>
+ <clear_rows>N</clear_rows>
+ <create_parent_folder>N</create_parent_folder>
+ <exec_per_row>N</exec_per_row>
+ <filename>${PROJECT_HOME}/0033-postgres-json-insert-json.hpl</filename>
+ <loglevel>Basic</loglevel>
+ <parameters>
+ <pass_all_parameters>Y</pass_all_parameters>
+ </parameters>
+ <params_from_previous>N</params_from_previous>
+ <run_configuration>local</run_configuration>
+ <set_append_logfile>N</set_append_logfile>
+ <set_logfile>N</set_logfile>
+ <wait_until_finished>Y</wait_until_finished>
+ <parallel>N</parallel>
+ <xloc>352</xloc>
+ <yloc>48</yloc>
+ <attributes_hac/>
+ </action>
+ <action>
+ <name>0033-postgres-json-validate-json.hpl</name>
+ <description/>
+ <type>PIPELINE</type>
+ <attributes/>
+ <add_date>N</add_date>
+ <add_time>N</add_time>
+ <clear_files>N</clear_files>
+ <clear_rows>N</clear_rows>
+ <create_parent_folder>N</create_parent_folder>
+ <exec_per_row>N</exec_per_row>
+ <filename>${PROJECT_HOME}/0033-postgres-json-validate-json.hpl</filename>
+ <loglevel>Basic</loglevel>
+ <parameters>
+ <pass_all_parameters>Y</pass_all_parameters>
+ </parameters>
+ <params_from_previous>N</params_from_previous>
+ <run_configuration>local</run_configuration>
+ <set_append_logfile>N</set_append_logfile>
+ <set_logfile>N</set_logfile>
+ <wait_until_finished>Y</wait_until_finished>
+ <parallel>N</parallel>
+ <xloc>608</xloc>
+ <yloc>48</yloc>
+ <attributes_hac/>
+ </action>
+ </actions>
+ <hops>
+ <hop>
+ <from>Start</from>
+ <to>Init table</to>
+ <enabled>Y</enabled>
+ <evaluation>Y</evaluation>
+ <unconditional>Y</unconditional>
+ </hop>
+ <hop>
+ <from>Init table</from>
+ <to>0033-postgres-json-insert-json.hpl</to>
+ <enabled>Y</enabled>
+ <evaluation>Y</evaluation>
+ <unconditional>N</unconditional>
+ </hop>
+ <hop>
+ <from>0033-postgres-json-insert-json.hpl</from>
+ <to>0033-postgres-json-validate-json.hpl</to>
+ <enabled>Y</enabled>
+ <evaluation>Y</evaluation>
+ <unconditional>N</unconditional>
+ </hop>
+ </hops>
+ <notepads>
+ </notepads>
+ <attributes/>
+</workflow>
diff --git a/integration-tests/json/0007-json-input-json-value.hpl
b/integration-tests/json/0007-json-input-json-value.hpl
new file mode 100644
index 0000000000..6f3f5a26f2
--- /dev/null
+++ b/integration-tests/json/0007-json-input-json-value.hpl
@@ -0,0 +1,235 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+ <info>
+ <name>0007-json-input-json-value</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <pipeline_version/>
+ <pipeline_type>Normal</pipeline_type>
+ <parameters>
+ </parameters>
+ <capture_transform_performance>N</capture_transform_performance>
+
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 16:39:01.337</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 16:39:01.337</modified_date>
+ </info>
+ <notepads>
+ </notepads>
+ <order>
+ <hop>
+ <from>Dummy JSON</from>
+ <to>Get Attributes</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>Get Attributes</from>
+ <to>Data validator</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>Get Attributes</from>
+ <to>Validate</to>
+ <enabled>Y</enabled>
+ </hop>
+ </order>
+ <transform>
+ <name>Data validator</name>
+ <type>Validator</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <concat_errors>N</concat_errors>
+ <validate_all>N</validate_all>
+ <validator_field>
+ <allowed_value>
+</allowed_value>
+ <data_type>JSON</data_type>
+ <data_type_verified>Y</data_type_verified>
+ <is_sourcing_values>N</is_sourcing_values>
+ <max_length/>
+ <min_length/>
+ <name>secondMove</name>
+ <null_allowed>N</null_allowed>
+ <only_null_allowed>N</only_null_allowed>
+ <only_numeric_allowed>N</only_numeric_allowed>
+ <validation_name>JSON type</validation_name>
+ </validator_field>
+ <attributes/>
+ <GUI>
+ <xloc>352</xloc>
+ <yloc>240</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Dummy JSON</name>
+ <type>DataGrid</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <data>
+ <line>
+ <item>{"name": "Goku", "moves": [{"power": 9500, "moveName":
"Kamehameha"}, {"power": 12000, "moveName": "Spirit Bomb"}]}</item>
+ </line>
+ <line>
+ <item>{"name": "Vegeta", "moves": [{"power": 9000, "moveName": "Galick
Gun"}, {"power": 11500, "moveName": "Final Flash"}]}</item>
+ </line>
+ <line>
+ <item>{"name": "Gohan", "moves": [{"power": 8500, "moveName":
"Masenko"}, {"power": 11000, "moveName": "Father-Son Kamehameha"}]}</item>
+ </line>
+ </data>
+ <fields>
+ <field>
+ <length>-1</length>
+ <precision>-1</precision>
+ <set_empty_string>N</set_empty_string>
+ <name>z_fighter</name>
+ <type>JSON</type>
+ </field>
+ </fields>
+ <attributes/>
+ <GUI>
+ <xloc>192</xloc>
+ <yloc>128</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Get Attributes</name>
+ <type>JsonInput</type>
+ <description/>
+ <distribute>N</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <include>N</include>
+ <include_field/>
+ <rownum>N</rownum>
+ <addresultfile>N</addresultfile>
+ <readurl>N</readurl>
+ <removeSourceField>Y</removeSourceField>
+ <IsIgnoreEmptyFile>N</IsIgnoreEmptyFile>
+ <doNotFailIfNoFile>Y</doNotFailIfNoFile>
+ <ignoreMissingPath>Y</ignoreMissingPath>
+ <defaultPathLeafToNull>Y</defaultPathLeafToNull>
+ <rownum_field/>
+ <file>
+ <name/>
+ <filemask/>
+ <exclude_filemask/>
+ <file_required>N</file_required>
+ <include_subfolders>N</include_subfolders>
+ </file>
+ <fields>
+ <field>
+ <name>name</name>
+ <path>name</path>
+ <type>String</type>
+ <format/>
+ <currency/>
+ <decimal/>
+ <group/>
+ <length>-1</length>
+ <precision>-1</precision>
+ <trim_type>none</trim_type>
+ <repeat>N</repeat>
+ </field>
+ <field>
+ <name>powerFirstMove</name>
+ <path>moves[0].power</path>
+ <type>Integer</type>
+ <format/>
+ <currency/>
+ <decimal/>
+ <group/>
+ <length>-1</length>
+ <precision>-1</precision>
+ <trim_type>none</trim_type>
+ <repeat>N</repeat>
+ </field>
+ <field>
+ <name>secondMove</name>
+ <path>moves[1]</path>
+ <type>JSON</type>
+ <format/>
+ <currency/>
+ <decimal/>
+ <group/>
+ <length>-1</length>
+ <precision>-1</precision>
+ <trim_type>none</trim_type>
+ <repeat>N</repeat>
+ </field>
+ </fields>
+ <limit>0</limit>
+ <IsInFields>Y</IsInFields>
+ <IsAFile>N</IsAFile>
+ <valueField>z_fighter</valueField>
+ <shortFileFieldName/>
+ <pathFieldName/>
+ <hiddenFieldName/>
+ <lastModificationTimeFieldName/>
+ <uriNameFieldName/>
+ <rootUriNameFieldName/>
+ <extensionFieldName/>
+ <sizeFieldName/>
+ <attributes/>
+ <GUI>
+ <xloc>352</xloc>
+ <yloc>128</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Validate</name>
+ <type>Dummy</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <attributes/>
+ <GUI>
+ <xloc>512</xloc>
+ <yloc>128</yloc>
+ </GUI>
+ </transform>
+ <transform_error_handling>
+ </transform_error_handling>
+ <attributes/>
+</pipeline>
diff --git a/integration-tests/json/datasets/golden-json-input-json.csv
b/integration-tests/json/datasets/golden-json-input-json.csv
new file mode 100644
index 0000000000..a55d103160
--- /dev/null
+++ b/integration-tests/json/datasets/golden-json-input-json.csv
@@ -0,0 +1,4 @@
+name,powerFirstMove
+Goku,9500
+Vegeta,9000
+Gohan,8500
diff --git a/integration-tests/json/main-0007-json-input-json-value.hwf
b/integration-tests/json/main-0007-json-input-json-value.hwf
new file mode 100644
index 0000000000..34394d7ef5
--- /dev/null
+++ b/integration-tests/json/main-0007-json-input-json-value.hwf
@@ -0,0 +1,80 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<workflow>
+ <name>main-0007-json-input-json-value</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <workflow_version/>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 16:37:35.792</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 16:37:35.792</modified_date>
+ <parameters>
+ </parameters>
+ <actions>
+ <action>
+ <name>Start</name>
+ <description/>
+ <type>SPECIAL</type>
+ <attributes/>
+ <DayOfMonth>1</DayOfMonth>
+ <doNotWaitOnFirstExecution>N</doNotWaitOnFirstExecution>
+ <hour>12</hour>
+ <intervalMinutes>60</intervalMinutes>
+ <intervalSeconds>0</intervalSeconds>
+ <minutes>0</minutes>
+ <repeat>N</repeat>
+ <schedulerType>0</schedulerType>
+ <weekDay>1</weekDay>
+ <parallel>N</parallel>
+ <xloc>50</xloc>
+ <yloc>50</yloc>
+ <attributes_hac/>
+ </action>
+ <action>
+ <name>Run Pipeline Unit Tests</name>
+ <description/>
+ <type>RunPipelineTests</type>
+ <attributes/>
+ <test_names>
+ <test_name>
+ <name>0007-json-input-json-value UNIT</name>
+ </test_name>
+ </test_names>
+ <parallel>N</parallel>
+ <xloc>256</xloc>
+ <yloc>48</yloc>
+ <attributes_hac/>
+ </action>
+ </actions>
+ <hops>
+ <hop>
+ <from>Start</from>
+ <to>Run Pipeline Unit Tests</to>
+ <enabled>Y</enabled>
+ <evaluation>Y</evaluation>
+ <unconditional>Y</unconditional>
+ </hop>
+ </hops>
+ <notepads>
+ </notepads>
+ <attributes/>
+</workflow>
diff --git
a/integration-tests/json/metadata/dataset/golden-json-input-json.json
b/integration-tests/json/metadata/dataset/golden-json-input-json.json
new file mode 100644
index 0000000000..aa933412ca
--- /dev/null
+++ b/integration-tests/json/metadata/dataset/golden-json-input-json.json
@@ -0,0 +1,25 @@
+{
+ "base_filename": "golden-json-input-json.csv",
+ "virtualPath": "",
+ "name": "golden-json-input-json",
+ "description": "",
+ "dataset_fields": [
+ {
+ "field_comment": "",
+ "field_length": -1,
+ "field_type": 2,
+ "field_precision": -1,
+ "field_name": "name",
+ "field_format": ""
+ },
+ {
+ "field_comment": "",
+ "field_length": -1,
+ "field_type": 5,
+ "field_precision": -1,
+ "field_name": "powerFirstMove",
+ "field_format": ""
+ }
+ ],
+ "folder_name": "${HOP_DATASETS_FOLDER}"
+}
\ No newline at end of file
diff --git
a/integration-tests/json/metadata/unit-test/0007-json-input-json-value
UNIT.json
b/integration-tests/json/metadata/unit-test/0007-json-input-json-value UNIT.json
new file mode 100644
index 0000000000..9aabbc2184
--- /dev/null
+++ b/integration-tests/json/metadata/unit-test/0007-json-input-json-value
UNIT.json
@@ -0,0 +1,33 @@
+{
+ "database_replacements": [],
+ "autoOpening": true,
+ "description": "",
+ "persist_filename": "",
+ "test_type": "UNIT_TEST",
+ "variableValues": [],
+ "basePath": "",
+ "golden_data_sets": [
+ {
+ "field_mappings": [
+ {
+ "transform_field": "name",
+ "data_set_field": "name"
+ },
+ {
+ "transform_field": "powerFirstMove",
+ "data_set_field": "powerFirstMove"
+ }
+ ],
+ "field_order": [
+ "name",
+ "powerFirstMove"
+ ],
+ "data_set_name": "golden-json-input-json",
+ "transform_name": "Validate"
+ }
+ ],
+ "input_data_sets": [],
+ "name": "0007-json-input-json-value UNIT",
+ "trans_test_tweaks": [],
+ "pipeline_filename": "./0007-json-input-json-value.hpl"
+}
\ No newline at end of file
diff --git a/integration-tests/mongo/tests/mongo-json/main-mongo-json.hwf
b/integration-tests/mongo/tests/mongo-json/main-mongo-json.hwf
new file mode 100644
index 0000000000..a19d0c9044
--- /dev/null
+++ b/integration-tests/mongo/tests/mongo-json/main-mongo-json.hwf
@@ -0,0 +1,163 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<workflow>
+ <name>main-mongo-json</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <workflow_version/>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 17:43:10.630</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 17:43:10.630</modified_date>
+ <parameters>
+ </parameters>
+ <actions>
+ <action>
+ <name>Start</name>
+ <description/>
+ <type>SPECIAL</type>
+ <attributes/>
+ <DayOfMonth>1</DayOfMonth>
+ <doNotWaitOnFirstExecution>N</doNotWaitOnFirstExecution>
+ <hour>12</hour>
+ <intervalMinutes>60</intervalMinutes>
+ <intervalSeconds>0</intervalSeconds>
+ <minutes>0</minutes>
+ <repeat>N</repeat>
+ <schedulerType>0</schedulerType>
+ <weekDay>1</weekDay>
+ <parallel>N</parallel>
+ <xloc>50</xloc>
+ <yloc>50</yloc>
+ <attributes_hac/>
+ </action>
+ <action>
+ <name>mongo-insert-json.hpl</name>
+ <description/>
+ <type>PIPELINE</type>
+ <attributes/>
+ <add_date>N</add_date>
+ <add_time>N</add_time>
+ <clear_files>N</clear_files>
+ <clear_rows>N</clear_rows>
+ <create_parent_folder>N</create_parent_folder>
+ <exec_per_row>N</exec_per_row>
+
<filename>${PROJECT_HOME}/tests/mongo-json/mongo-insert-json.hpl</filename>
+ <logext/>
+ <logfile/>
+ <loglevel>Basic</loglevel>
+ <parameters>
+ <pass_all_parameters>Y</pass_all_parameters>
+ </parameters>
+ <params_from_previous>N</params_from_previous>
+ <run_configuration>local</run_configuration>
+ <set_append_logfile>N</set_append_logfile>
+ <set_logfile>N</set_logfile>
+ <wait_until_finished>Y</wait_until_finished>
+ <parallel>N</parallel>
+ <xloc>224</xloc>
+ <yloc>48</yloc>
+ <attributes_hac/>
+ </action>
+ <action>
+ <name>mongo-delete-json.hpl</name>
+ <description/>
+ <type>PIPELINE</type>
+ <attributes/>
+ <add_date>N</add_date>
+ <add_time>N</add_time>
+ <clear_files>N</clear_files>
+ <clear_rows>N</clear_rows>
+ <create_parent_folder>N</create_parent_folder>
+ <exec_per_row>N</exec_per_row>
+
<filename>${PROJECT_HOME}/tests/mongo-json/mongo-delete-json.hpl</filename>
+ <logext/>
+ <logfile/>
+ <loglevel>Basic</loglevel>
+ <parameters>
+ <pass_all_parameters>Y</pass_all_parameters>
+ </parameters>
+ <params_from_previous>N</params_from_previous>
+ <run_configuration>local</run_configuration>
+ <set_append_logfile>N</set_append_logfile>
+ <set_logfile>N</set_logfile>
+ <wait_until_finished>Y</wait_until_finished>
+ <parallel>N</parallel>
+ <xloc>400</xloc>
+ <yloc>48</yloc>
+ <attributes_hac/>
+ </action>
+ <action>
+ <name>mongo-read-json.hpl</name>
+ <description/>
+ <type>PIPELINE</type>
+ <attributes/>
+ <add_date>N</add_date>
+ <add_time>N</add_time>
+ <clear_files>N</clear_files>
+ <clear_rows>N</clear_rows>
+ <create_parent_folder>N</create_parent_folder>
+ <exec_per_row>N</exec_per_row>
+ <filename>${PROJECT_HOME}/tests/mongo-json/mongo-read-json.hpl</filename>
+ <logext/>
+ <logfile/>
+ <loglevel>Basic</loglevel>
+ <parameters>
+ <pass_all_parameters>Y</pass_all_parameters>
+ </parameters>
+ <params_from_previous>N</params_from_previous>
+ <run_configuration>local</run_configuration>
+ <set_append_logfile>N</set_append_logfile>
+ <set_logfile>N</set_logfile>
+ <wait_until_finished>Y</wait_until_finished>
+ <parallel>N</parallel>
+ <xloc>576</xloc>
+ <yloc>48</yloc>
+ <attributes_hac/>
+ </action>
+ </actions>
+ <hops>
+ <hop>
+ <from>Start</from>
+ <to>mongo-insert-json.hpl</to>
+ <enabled>Y</enabled>
+ <evaluation>Y</evaluation>
+ <unconditional>Y</unconditional>
+ </hop>
+ <hop>
+ <from>mongo-insert-json.hpl</from>
+ <to>mongo-delete-json.hpl</to>
+ <enabled>Y</enabled>
+ <evaluation>Y</evaluation>
+ <unconditional>N</unconditional>
+ </hop>
+ <hop>
+ <from>mongo-delete-json.hpl</from>
+ <to>mongo-read-json.hpl</to>
+ <enabled>Y</enabled>
+ <evaluation>Y</evaluation>
+ <unconditional>N</unconditional>
+ </hop>
+ </hops>
+ <notepads>
+ </notepads>
+ <attributes/>
+</workflow>
diff --git a/integration-tests/mongo/tests/mongo-json/mongo-delete-json.hpl
b/integration-tests/mongo/tests/mongo-json/mongo-delete-json.hpl
new file mode 100644
index 0000000000..27a75dfbf5
--- /dev/null
+++ b/integration-tests/mongo/tests/mongo-json/mongo-delete-json.hpl
@@ -0,0 +1,141 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+ <info>
+ <name>mongo-delete-json</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <pipeline_version/>
+ <pipeline_type>Normal</pipeline_type>
+ <parameters>
+ </parameters>
+ <capture_transform_performance>N</capture_transform_performance>
+
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 17:47:34.130</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 17:47:34.130</modified_date>
+ </info>
+ <notepads>
+ <notepad>
+ <backgroundcolorblue>251</backgroundcolorblue>
+ <backgroundcolorgreen>232</backgroundcolorgreen>
+ <backgroundcolorred>201</backgroundcolorred>
+ <bordercolorblue>90</bordercolorblue>
+ <bordercolorgreen>58</bordercolorgreen>
+ <bordercolorred>14</bordercolorred>
+ <fontbold>N</fontbold>
+ <fontcolorblue>90</fontcolorblue>
+ <fontcolorgreen>58</fontcolorgreen>
+ <fontcolorred>14</fontcolorred>
+ <fontitalic>N</fontitalic>
+ <fontname>Segoe UI</fontname>
+ <fontsize>9</fontsize>
+ <height>26</height>
+ <xloc>512</xloc>
+ <yloc>112</yloc>
+ <note>Deletes 2 of the rows inserted by mongo-insert-json in
json_insert.</note>
+ <width>358</width>
+ </notepad>
+ </notepads>
+ <order>
+ <hop>
+ <from>Data grid</from>
+ <to>MongoDB Delete</to>
+ <enabled>Y</enabled>
+ </hop>
+ </order>
+ <transform>
+ <name>MongoDB Delete</name>
+ <type>MongoDbDelete</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <collection>json_insert</collection>
+ <connection>mongo</connection>
+ <execute_for_each_row>N</execute_for_each_row>
+ <fields>
+ <field>
+ <comparator>=</comparator>
+ <doc_path>z_fighter</doc_path>
+ <incoming_field_1>z_fighter</incoming_field_1>
+ <incoming_field_2/>
+ </field>
+ </fields>
+ <json_query/>
+ <retries>5</retries>
+ <retry_delay>10</retry_delay>
+ <use_json_query>N</use_json_query>
+ <write_retries>5</write_retries>
+ <attributes/>
+ <GUI>
+ <xloc>720</xloc>
+ <yloc>176</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Data grid</name>
+ <type>DataGrid</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <data>
+ <line>
+ <item>{"name": "Goku", "moves": [{"power": 9500, "moveName":
"Kamehameha"}, {"power": 12000, "moveName": "Spirit Bomb"}], "height":
175}</item>
+ </line>
+ <line>
+ <item>{"name": "Piccolo", "moves": [{"power": 10000, "moveName":
"Special Beam Cannon"}, {"power": 9500, "moveName": "Hellzone Grenade"}],
"height": 226}</item>
+ </line>
+ </data>
+ <fields>
+ <field>
+ <length>-1</length>
+ <precision>-1</precision>
+ <currency/>
+ <set_empty_string>N</set_empty_string>
+ <name>z_fighter</name>
+ <format/>
+ <group/>
+ <decimal/>
+ <type>JSON</type>
+ </field>
+ </fields>
+ <attributes/>
+ <GUI>
+ <xloc>528</xloc>
+ <yloc>176</yloc>
+ </GUI>
+ </transform>
+ <transform_error_handling>
+ </transform_error_handling>
+ <attributes/>
+</pipeline>
diff --git a/integration-tests/mongo/tests/mongo-json/mongo-insert-json.hpl
b/integration-tests/mongo/tests/mongo-json/mongo-insert-json.hpl
new file mode 100644
index 0000000000..257e8fd8a4
--- /dev/null
+++ b/integration-tests/mongo/tests/mongo-json/mongo-insert-json.hpl
@@ -0,0 +1,126 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+ <info>
+ <name>mongo-insert-json</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <pipeline_version/>
+ <pipeline_type>Normal</pipeline_type>
+ <parameters>
+ </parameters>
+ <capture_transform_performance>N</capture_transform_performance>
+
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 17:43:53.782</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 17:43:53.782</modified_date>
+ </info>
+ <notepads>
+ </notepads>
+ <order>
+ <hop>
+ <from>Data grid</from>
+ <to>insert</to>
+ <enabled>Y</enabled>
+ </hop>
+ </order>
+ <transform>
+ <name>Data grid</name>
+ <type>DataGrid</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <data>
+ <line>
+ <item>{"name": "Goku", "moves": [{"power": 9500, "moveName":
"Kamehameha"}, {"power": 12000, "moveName": "Spirit Bomb"}], "height":
175}</item>
+ </line>
+ <line>
+ <item>{"name": "Vegeta", "moves": [{"power": 9000, "moveName": "Galick
Gun"}, {"power": 11500, "moveName": "Final Flash"}], "height": 164}</item>
+ </line>
+ <line>
+ <item>{"name": "Piccolo", "moves": [{"power": 10000, "moveName":
"Special Beam Cannon"}, {"power": 9500, "moveName": "Hellzone Grenade"}],
"height": 226}</item>
+ </line>
+ </data>
+ <fields>
+ <field>
+ <length>-1</length>
+ <precision>-1</precision>
+ <set_empty_string>N</set_empty_string>
+ <name>z_fighter</name>
+ <type>JSON</type>
+ </field>
+ </fields>
+ <attributes/>
+ <GUI>
+ <xloc>288</xloc>
+ <yloc>112</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>insert</name>
+ <type>MongoDbOutput</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <connection>mongo</connection>
+ <mongo_collection>json_insert</mongo_collection>
+ <batch_insert_size>5</batch_insert_size>
+ <truncate>Y</truncate>
+ <update>N</update>
+ <upsert>N</upsert>
+ <multi>N</multi>
+ <modifier_update>N</modifier_update>
+ <write_retries>5</write_retries>
+ <write_retry_delay>10</write_retry_delay>
+ <mongo_fields>
+ <mongo_field>
+ <incoming_field_name>z_fighter</incoming_field_name>
+ <mongo_doc_path>z_fighter</mongo_doc_path>
+
<use_incoming_field_name_as_mongo_field_name>N</use_incoming_field_name_as_mongo_field_name>
+ <update_match_field>N</update_match_field>
+ <modifier_update_operation>N/A</modifier_update_operation>
+ <modifier_policy>Insert&Update</modifier_policy>
+ <json_field>N</json_field>
+ <allow_null>N</allow_null>
+ </mongo_field>
+ </mongo_fields>
+ <attributes/>
+ <GUI>
+ <xloc>480</xloc>
+ <yloc>112</yloc>
+ </GUI>
+ </transform>
+ <transform_error_handling>
+ </transform_error_handling>
+ <attributes/>
+</pipeline>
diff --git a/integration-tests/mongo/tests/mongo-json/mongo-read-json.hpl
b/integration-tests/mongo/tests/mongo-json/mongo-read-json.hpl
new file mode 100644
index 0000000000..8fcb5e7ece
--- /dev/null
+++ b/integration-tests/mongo/tests/mongo-json/mongo-read-json.hpl
@@ -0,0 +1,207 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements. See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to You under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+-->
+<pipeline>
+ <info>
+ <name>mongo-read-json</name>
+ <name_sync_with_filename>Y</name_sync_with_filename>
+ <description/>
+ <extended_description/>
+ <pipeline_version/>
+ <pipeline_type>Normal</pipeline_type>
+ <parameters>
+ </parameters>
+ <capture_transform_performance>N</capture_transform_performance>
+
<transform_performance_capturing_delay>1000</transform_performance_capturing_delay>
+
<transform_performance_capturing_size_limit>100</transform_performance_capturing_size_limit>
+ <created_user>-</created_user>
+ <created_date>2025/10/11 17:50:17.818</created_date>
+ <modified_user>-</modified_user>
+ <modified_date>2025/10/11 17:50:17.818</modified_date>
+ </info>
+ <notepads>
+ <notepad>
+ <backgroundcolorblue>251</backgroundcolorblue>
+ <backgroundcolorgreen>232</backgroundcolorgreen>
+ <backgroundcolorred>201</backgroundcolorred>
+ <bordercolorblue>90</bordercolorblue>
+ <bordercolorgreen>58</bordercolorgreen>
+ <bordercolorred>14</bordercolorred>
+ <fontbold>N</fontbold>
+ <fontcolorblue>90</fontcolorblue>
+ <fontcolorgreen>58</fontcolorgreen>
+ <fontcolorred>14</fontcolorred>
+ <fontitalic>N</fontitalic>
+ <fontname>Segoe UI</fontname>
+ <fontsize>9</fontsize>
+ <height>26</height>
+ <xloc>128</xloc>
+ <yloc>64</yloc>
+ <note>If mongo-insert-json and mongo-delete-json successfully executed,
json_insert will have 1 row.</note>
+ <width>511</width>
+ </notepad>
+ </notepads>
+ <order>
+ <hop>
+ <from>Is 11500?</from>
+ <to>Abort</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>MongoDB input</from>
+ <to>Data validator</to>
+ <enabled>Y</enabled>
+ </hop>
+ <hop>
+ <from>MongoDB input</from>
+ <to>Is 11500?</to>
+ <enabled>Y</enabled>
+ </hop>
+ </order>
+ <transform>
+ <name>Abort</name>
+ <type>Abort</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <abort_option>ABORT_WITH_ERROR</abort_option>
+ <always_log_rows>Y</always_log_rows>
+ <row_threshold>0</row_threshold>
+ <attributes/>
+ <GUI>
+ <xloc>512</xloc>
+ <yloc>224</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Data validator</name>
+ <type>Validator</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <concat_errors>N</concat_errors>
+ <validate_all>N</validate_all>
+ <validator_field>
+ <allowed_value>
+</allowed_value>
+ <data_type>JSON</data_type>
+ <data_type_verified>Y</data_type_verified>
+ <is_sourcing_values>N</is_sourcing_values>
+ <max_length/>
+ <min_length/>
+ <name>secondMove</name>
+ <null_allowed>N</null_allowed>
+ <only_null_allowed>N</only_null_allowed>
+ <only_numeric_allowed>N</only_numeric_allowed>
+ <validation_name>json_type</validation_name>
+ </validator_field>
+ <attributes/>
+ <GUI>
+ <xloc>352</xloc>
+ <yloc>128</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>Is 11500?</name>
+ <type>FilterRows</type>
+ <description/>
+ <distribute>Y</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <compare>
+ <condition>
+ <conditions>
+</conditions>
+ <function>=</function>
+ <leftvalue>secondMovePower</leftvalue>
+ <negated>N</negated>
+ <operator>-</operator>
+ <value>
+ <isnull>N</isnull>
+ <length>-1</length>
+ <mask>####0;-####0</mask>
+ <name>constant</name>
+ <precision>0</precision>
+ <text>11500</text>
+ <type>Integer</type>
+ </value>
+ </condition>
+ </compare>
+ <send_false_to>Abort</send_false_to>
+ <attributes/>
+ <GUI>
+ <xloc>352</xloc>
+ <yloc>224</yloc>
+ </GUI>
+ </transform>
+ <transform>
+ <name>MongoDB input</name>
+ <type>MongoDbInput</type>
+ <description/>
+ <distribute>N</distribute>
+ <custom_distribution/>
+ <copies>1</copies>
+ <partitioning>
+ <method>none</method>
+ <schema_name/>
+ </partitioning>
+ <connection>mongo</connection>
+ <fields_name>{"_id":0, "z_fighter": 1}</fields_name>
+ <collection>json_insert</collection>
+ <json_field_name>json</json_field_name>
+ <json_query/>
+ <output_json>N</output_json>
+ <query_is_pipeline>N</query_is_pipeline>
+ <execute_for_each_row>N</execute_for_each_row>
+ <mongo_fields>
+ <mongo_field>
+ <field_name>secondMove</field_name>
+ <field_path>z_fighter.moves[1]</field_path>
+ <field_type>JSON</field_type>
+ </mongo_field>
+ <mongo_field>
+ <field_name>secondMovePower</field_name>
+ <field_path>z_fighter.moves[1].power</field_path>
+ <field_type>Integer</field_type>
+ </mongo_field>
+ </mongo_fields>
+ <attributes/>
+ <GUI>
+ <xloc>128</xloc>
+ <yloc>128</yloc>
+ </GUI>
+ </transform>
+ <transform_error_handling>
+ </transform_error_handling>
+ <attributes/>
+</pipeline>
diff --git
a/plugins/transforms/calculator/src/main/java/org/apache/hop/pipeline/transforms/calculator/calculations/math/Round2.java
b/plugins/transforms/calculator/src/main/java/org/apache/hop/pipeline/transforms/calculator/calculations/math/Round2.java
index 8c5c55ee9a..d6ae3b1577 100644
---
a/plugins/transforms/calculator/src/main/java/org/apache/hop/pipeline/transforms/calculator/calculations/math/Round2.java
+++
b/plugins/transforms/calculator/src/main/java/org/apache/hop/pipeline/transforms/calculator/calculations/math/Round2.java
@@ -31,7 +31,7 @@ public class Round2 implements ICalculation {
@Override
public String getCode() {
- return "ROUND_1";
+ return "ROUND_2";
}
@Override
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
index 321e5fcf4c..a3d1a135f2 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInput.java
@@ -17,6 +17,8 @@
package org.apache.hop.pipeline.transforms.jsoninput;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.JsonNodeFactory;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -55,7 +57,13 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
private RowOutputConverter rowOutputConverter;
- private static final byte[] EMPTY_JSON = "{}".getBytes(); // for replacing
null inputs
+ /** for replacing null inputs */
+ private static final byte[] EMPTY_JSON = "{}".getBytes();
+
+ /** for replacing null inputs when input value is JsonNode */
+ private static final JsonNode EMPTY_OBJECT_NODE =
JsonNodeFactory.instance.objectNode();
+
+ private boolean isProcessingJson;
public JsonInput(
TransformMeta transformMeta,
@@ -197,7 +205,25 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
// Create convert meta-data objects that will contain Date & Number
formatters
data.convertRowMeta =
data.outputRowMeta.cloneToType(IValueMeta.TYPE_STRING);
- data.inputs = new InputsReader(this, meta, data, new
InputErrorHandler()).iterator();
+
+ // behave differently if the incoming value is JsonNode or String
+ if (data.inputRowMeta != null
+ && data.inputRowMeta.getValueMeta(data.indexSourceField) != null
+ && data.inputRowMeta.getValueMeta(data.indexSourceField).isJson()) {
+ // JsonNode
+ //
+ data.jsonInputs =
+ new InputsReader(this, meta, data, new
InputErrorHandler()).jsonFieldIterator();
+ data.inputs = null;
+ isProcessingJson = true;
+ } else {
+ // String
+ //
+ data.inputs = new InputsReader(this, meta, data, new
InputErrorHandler()).iterator();
+ data.jsonInputs = null;
+ isProcessingJson = false;
+ }
+
data.readerRowSet = new QueueRowSet();
data.readerRowSet.setDone();
this.rowOutputConverter = new RowOutputConverter(getLogChannel());
@@ -257,7 +283,7 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
private void parseNextInputToRowSet(InputStream input) throws HopException {
try {
- data.readerRowSet = data.reader.parse(input);
+ data.readerRowSet = data.reader.parseStringValue(input);
} catch (HopException ke) {
logInputError(ke);
throw new JsonInputException(ke);
@@ -269,6 +295,15 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
}
}
+ private void parseNextJsonToRowSet(JsonNode node) throws HopException {
+ try {
+ data.readerRowSet = data.reader.parseJsonNodeValue(node);
+ } catch (Exception e) {
+ logInputError(e);
+ throw new JsonInputException(e);
+ }
+ }
+
private void logInputError(HopException e) {
logError(e.getLocalizedMessage(), e);
inputError(e.getLocalizedMessage());
@@ -329,25 +364,48 @@ public class JsonInput extends
BaseFileInputTransform<JsonInputMeta, JsonInputDa
return null;
}
Object[] rawReaderRow = null;
- while ((rawReaderRow = data.readerRowSet.getRow()) == null) {
- if (data.inputs.hasNext() && data.readerRowSet.isDone()) {
- try (InputStream nextIn = data.inputs.next()) {
- if (nextIn != null) {
- parseNextInputToRowSet(nextIn);
+ if (isProcessingJson) {
+ // If the incoming field is a JsonNode, don't do conversion,
+ // just get the value at the path specified by the user
+ while ((rawReaderRow = data.readerRowSet.getRow()) == null) {
+ if (data.jsonInputs.hasNext() && data.readerRowSet.isDone()) {
+ JsonNode nextNode = data.jsonInputs.next();
+
+ if (nextNode != null) {
+ parseNextJsonToRowSet(nextNode);
} else {
- parseNextInputToRowSet(new ByteArrayInputStream(EMPTY_JSON));
+ parseNextJsonToRowSet(EMPTY_OBJECT_NODE);
}
- } catch (IOException e) {
- logError(BaseMessages.getString(PKG,
"JsonInput.Log.UnexpectedError", e.toString()), e);
- incrementErrors();
+ } else {
+ if (isDetailed()) {
+ logDetailed(BaseMessages.getString(PKG,
"JsonInput.Log.FinishedProcessing"));
+ }
+ return null;
}
- } else {
- if (isDetailed()) {
- logDetailed(BaseMessages.getString(PKG,
"JsonInput.Log.FinishedProcessing"));
+ }
+ } else {
+ while ((rawReaderRow = data.readerRowSet.getRow()) == null) {
+ if (data.inputs.hasNext() && data.readerRowSet.isDone()) {
+ try (InputStream nextIn = data.inputs.next()) {
+
+ if (nextIn != null) {
+ parseNextInputToRowSet(nextIn);
+ } else {
+ parseNextInputToRowSet(new ByteArrayInputStream(EMPTY_JSON));
+ }
+
+ } catch (IOException e) {
+ logError(BaseMessages.getString(PKG,
"JsonInput.Log.UnexpectedError", e.toString()), e);
+ incrementErrors();
+ }
+ } else {
+ if (isDetailed()) {
+ logDetailed(BaseMessages.getString(PKG,
"JsonInput.Log.FinishedProcessing"));
+ }
+ return null;
}
- return null;
}
}
Object[] outputRow = rowOutputConverter.getRow(buildBaseOutputRow(),
rawReaderRow, data);
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
index 4faac7a296..a7abee5571 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputData.java
@@ -17,6 +17,7 @@
package org.apache.hop.pipeline.transforms.jsoninput;
+import com.fasterxml.jackson.databind.JsonNode;
import java.io.InputStream;
import java.util.BitSet;
import java.util.Iterator;
@@ -48,6 +49,7 @@ public class JsonInputData extends BaseFileInputTransformData
implements ITransf
public int indexSourceField;
public Iterator<InputStream> inputs;
+ public Iterator<JsonNode> jsonInputs; // if incoming field is JsonNode
public IJsonReader reader;
public IRowSet readerRowSet;
public BitSet repeatedFields;
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
index b27813d927..f67e581393 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/FastJsonReader.java
@@ -17,15 +17,21 @@
package org.apache.hop.pipeline.transforms.jsoninput.reader;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ArrayNode;
import com.jayway.jsonpath.Configuration;
import com.jayway.jsonpath.JsonPath;
import com.jayway.jsonpath.Option;
import com.jayway.jsonpath.ParseContext;
import com.jayway.jsonpath.ReadContext;
+import com.jayway.jsonpath.spi.json.JacksonJsonNodeJsonProvider;
+import com.jayway.jsonpath.spi.mapper.JacksonMappingProvider;
import java.io.InputStream;
+import java.util.AbstractList;
import java.util.ArrayList;
import java.util.EnumSet;
import java.util.List;
+import java.util.RandomAccess;
import org.apache.hop.core.Const;
import org.apache.hop.core.IRowSet;
import org.apache.hop.core.SingleRowRowSet;
@@ -44,8 +50,13 @@ public class FastJsonReader implements IJsonReader {
private static final String JSON_CHARSET = "UTF-8";
private ReadContext jsonReadContext;
+
+ /** used if the incoming value is a String */
private Configuration jsonConfiguration;
+ /** used if the incoming value is a JsonNode */
+ private Configuration jsonNodeConfiguration;
+
private boolean ignoreMissingPath;
private boolean defaultPathLeafToNull;
@@ -61,6 +72,7 @@ public class FastJsonReader implements IJsonReader {
this.ignoreMissingPath = false;
this.defaultPathLeafToNull = true;
this.jsonConfiguration =
Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
+ this.jsonNodeConfiguration = getJacksonNodeJsonPathConfig();
this.log = log;
}
@@ -85,6 +97,14 @@ public class FastJsonReader implements IJsonReader {
}
}
+ private Configuration getJacksonNodeJsonPathConfig() {
+ return Configuration.builder()
+ .jsonProvider(new JacksonJsonNodeJsonProvider())
+ .mappingProvider(new JacksonMappingProvider())
+ .options(DEFAULT_OPTIONS)
+ .build();
+ }
+
public boolean isDefaultPathLeafToNull() {
return defaultPathLeafToNull;
}
@@ -121,10 +141,16 @@ public class FastJsonReader implements IJsonReader {
this.ignoreMissingPath = value;
}
+ // used if incoming value is String
private ParseContext getParseContext() {
return JsonPath.using(jsonConfiguration);
}
+ // used if incoming value is JsonNode
+ private ParseContext getJsonNodeParseContext() {
+ return JsonPath.using(jsonNodeConfiguration);
+ }
+
private ReadContext getReadContext() {
return jsonReadContext;
}
@@ -150,6 +176,13 @@ public class FastJsonReader implements IJsonReader {
}
}
+ protected void readInput(JsonNode node) throws HopException {
+ jsonReadContext = getJsonNodeParseContext().parse(node);
+ if (jsonReadContext == null) {
+ throw new HopException(BaseMessages.getString(PKG,
"JsonReader.Error.ReadUrl.Null"));
+ }
+ }
+
@Override
public boolean isIgnoreMissingPath() {
return this.ignoreMissingPath;
@@ -162,8 +195,18 @@ public class FastJsonReader implements IJsonReader {
}
@Override
- public IRowSet parse(InputStream in) throws HopException {
+ public IRowSet parseStringValue(InputStream in) throws HopException {
readInput(in);
+ return getRow();
+ }
+
+ @Override
+ public IRowSet parseJsonNodeValue(JsonNode node) throws HopException {
+ readInput(node);
+ return getRow();
+ }
+
+ private IRowSet getRow() throws HopException {
List<List<?>> results = evalCombinedResult();
int len = results.isEmpty() ? 0 : getMaxRowSize(results);
if (log.isDetailed()) {
@@ -262,7 +305,8 @@ public class FastJsonReader implements IJsonReader {
List<List<?>> results = new ArrayList<>(paths.length);
int i = 0;
for (JsonPath path : paths) {
- List<Object> result = getReadContext().read(path);
+ Object raw = getReadContext().read(path);
+ List<Object> result = normalizeJsonPathResult(raw);
if (result.size() != lastSize && lastSize > 0 && !result.isEmpty()) {
throw new JsonInputException(
BaseMessages.getString(
@@ -293,4 +337,42 @@ public class FastJsonReader implements IJsonReader {
}
return true;
}
+
+ @SuppressWarnings("unchecked")
+ private static List<Object> normalizeJsonPathResult(Object r) throws
JsonInputException {
+ if (r instanceof List<?>) {
+ // Already a List
+ return (List<Object>) r;
+ }
+ if (r instanceof ArrayNode arr) {
+ // expose array elements as a List view,
+ // doesn't do conversion for performance
+ return new ArrayNodeListView(arr);
+ }
+
+ throw new JsonInputException(
+ "Unexpected JsonPath result type: "
+ + r.getClass().getName()
+ + ". Expected List<?> or ArrayNode.");
+ }
+
+ /** A List view over an ArrayNode's elements to use its nodes without doing
conversion. */
+ private static final class ArrayNodeListView extends AbstractList<Object>
+ implements RandomAccess {
+ private final ArrayNode arr;
+
+ ArrayNodeListView(ArrayNode arr) {
+ this.arr = arr;
+ }
+
+ @Override
+ public Object get(int index) {
+ return arr.get(index);
+ } // returns JsonNode
+
+ @Override
+ public int size() {
+ return arr.size();
+ }
+ }
}
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
index 6c020b70db..11836521fa 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/IJsonReader.java
@@ -37,5 +37,8 @@ public interface IJsonReader {
void setIgnoreMissingPath(boolean value);
/** parse compiled fields into a rowset */
- public IRowSet parse(InputStream in) throws HopException;
+ public IRowSet parseStringValue(InputStream in) throws HopException;
+
+ /** parse incoming JsonNode fields into a rowset */
+ public IRowSet parseJsonNodeValue(com.fasterxml.jackson.databind.JsonNode
in) throws HopException;
}
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
index 047347b104..ee601ffc23 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/InputsReader.java
@@ -17,6 +17,7 @@
package org.apache.hop.pipeline.transforms.jsoninput.reader;
+import com.fasterxml.jackson.databind.JsonNode;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
@@ -81,6 +82,11 @@ public class InputsReader implements Iterable<InputStream> {
}
}
+ public Iterator<JsonNode> jsonFieldIterator() {
+ return new JsonFieldIterator(
+ new RowIterator(transform, data, errorHandler), data.indexSourceField);
+ }
+
protected StringFieldIterator getFieldIterator() {
return new StringFieldIterator(
new RowIterator(transform, data, errorHandler), data.indexSourceField);
@@ -215,7 +221,49 @@ public class InputsReader implements Iterable<InputStream>
{
@Override
public String next() {
Object[] row = rowIter.next();
- return (row == null || row.length <= idx) ? null : (String) row[idx];
+ if (row == null || row.length <= idx) return null;
+ Object v = row[idx];
+ if (v == null) return null;
+
+ if (v instanceof String vString) return vString;
+
+ throw new ClassCastException(
+ "Field at index " + idx + " is " + v.getClass().getName() + ",
expected String.");
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException(CONST_REMOVE);
+ }
+ }
+
+ protected class JsonFieldIterator implements Iterator<JsonNode> {
+ private final RowIterator rowIter;
+ private final int idx;
+
+ public JsonFieldIterator(RowIterator rowIter, int idx) {
+ this.rowIter = rowIter;
+ this.idx = idx;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return rowIter.hasNext();
+ }
+
+ @Override
+ public JsonNode next() {
+ Object[] row = rowIter.next();
+ if (row == null || row.length <= idx) return null;
+ Object v = row[idx];
+ if (v == null) return null;
+
+ if (v instanceof JsonNode node) {
+ return node;
+ }
+
+ throw new ClassCastException(
+ "Field at index " + idx + " is " + v.getClass().getName() + ",
expected JsonNode.");
}
@Override
diff --git
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/RowOutputConverter.java
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/RowOutputConverter.java
index 234101707c..475631627a 100644
---
a/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/RowOutputConverter.java
+++
b/plugins/transforms/json/src/main/java/org/apache/hop/pipeline/transforms/jsoninput/reader/RowOutputConverter.java
@@ -17,6 +17,7 @@
package org.apache.hop.pipeline.transforms.jsoninput.reader;
+import com.fasterxml.jackson.databind.node.TextNode;
import java.util.Map;
import net.minidev.json.JSONObject;
import org.apache.hop.core.exception.HopException;
@@ -59,6 +60,9 @@ public class RowOutputConverter {
if (jo instanceof Map) {
Map<String, ?> asStrMap = (Map<String, ?>) jo;
nodevalue = JSONObject.toJSONString(asStrMap);
+ } else if (jo instanceof TextNode jot) {
+ // this avoids returning string enclosed by "" if JsonNode
+ nodevalue = jot.asText();
} else {
nodevalue = jo.toString();
}
diff --git
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
index efbc05af58..0732f98a1b 100644
---
a/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
+++
b/plugins/transforms/json/src/test/java/org/apache/hop/pipeline/transforms/jsoninput/JsonInputTest.java
@@ -46,6 +46,7 @@ import org.apache.hop.core.HopClientEnvironment;
import org.apache.hop.core.IRowSet;
import org.apache.hop.core.exception.HopException;
import org.apache.hop.core.exception.HopFileException;
+import org.apache.hop.core.exception.HopPluginException;
import org.apache.hop.core.fileinput.FileInputList;
import org.apache.hop.core.json.HopJson;
import org.apache.hop.core.logging.ILoggingObject;
@@ -53,6 +54,7 @@ import org.apache.hop.core.logging.LogLevel;
import org.apache.hop.core.row.IRowMeta;
import org.apache.hop.core.row.IValueMeta;
import org.apache.hop.core.row.RowMeta;
+import org.apache.hop.core.row.value.ValueMetaFactory;
import org.apache.hop.core.row.value.ValueMetaInteger;
import org.apache.hop.core.row.value.ValueMetaNumber;
import org.apache.hop.core.row.value.ValueMetaString;
@@ -724,6 +726,29 @@ class JsonInputTest {
assertEquals(1, jsonInput.getLinesWritten(), "rows written");
}
+ @Test
+ void testSingleObjPred_withJsonNodeInput() throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ helper.redirectLog(out, LogLevel.ERROR);
+
+ JsonInputField bic = new JsonInputField("color");
+ bic.setPath("$.store.bicycle[?(@.price)].color");
+ bic.setType(IValueMeta.TYPE_STRING); // expect plain text (no quotes) in
output
+
+ JsonInputMeta meta = createSimpleMeta("json", bic);
+ meta.setRemoveSourceField(true);
+
+ JsonNode node = getBasicTestJsonNode();
+ JsonInput jsonInput = createJsonInputWithJsonNode("json", meta, new
Object[] {node});
+
+ RowComparatorListener rowComparator = new RowComparatorListener(new
Object[] {"red"});
+ jsonInput.addRowListener(rowComparator);
+
+ processRows(jsonInput, 2);
+ assertEquals(0, jsonInput.getErrors(), out.toString());
+ assertEquals(1, jsonInput.getLinesWritten(), "rows written");
+ }
+
@Test
void testArrayOut() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -753,6 +778,37 @@ class JsonInputTest {
assertEquals(1, jsonInput.getLinesWritten(), "rows written");
}
+ @Test
+ void testArrayOut_withJsonNodeInput() throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ helper.redirectLog(out, LogLevel.ERROR);
+
+ JsonInputField byc = new JsonInputField("books (array)");
+ byc.setPath("$.store.book");
+ byc.setType(IValueMeta.TYPE_STRING);
+
+ JsonInputMeta meta = createSimpleMeta("json", byc);
+ meta.setRemoveSourceField(true);
+
+ JsonNode node = getBasicTestJsonNode();
+ JsonInput jsonInput = createJsonInputWithJsonNode("json", meta, new
Object[] {node});
+
+ RowComparatorListener rowComparator =
+ new RowComparatorListener(
+ new Object[] {
+ "[{\"category\":\"reference\",\"author\":\"Nigel
Rees\",\"title\":\"Sayings of the Century\",\"price\":8.95},"
+ + "{\"category\":\"fiction\",\"author\":\"Evelyn
Waugh\",\"title\":\"Sword of Honour\",\"price\":12.99},"
+ + "{\"category\":\"fiction\",\"author\":\"Herman
Melville\",\"title\":\"Moby Dick\","
+ +
"\"isbn\":\"0-553-21311-3\",\"price\":8.99},{\"category\":\"fiction\",\"author\":\"J.
R. R. Tolkien\","
+ + "\"title\":\"The Lord of the
Rings\",\"isbn\":\"0-395-19395-8\",\"price\":22.99}]"
+ });
+ jsonInput.addRowListener(rowComparator);
+
+ processRows(jsonInput, 2);
+ assertEquals(0, jsonInput.getErrors(), out.toString());
+ assertEquals(1, jsonInput.getLinesWritten(), "rows written");
+ }
+
@Test
void testObjectOut() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -775,6 +831,30 @@ class JsonInputTest {
assertEquals(1, jsonInput.getLinesWritten(), "rows written");
}
+ @Test
+ void testObjectOut_withJsonNodeInput() throws Exception {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ helper.redirectLog(out, LogLevel.ERROR);
+
+ JsonInputField bic = new JsonInputField("the bicycle (obj)");
+ bic.setPath("$.store.bicycle");
+ bic.setType(IValueMeta.TYPE_STRING);
+
+ JsonInputMeta meta = createSimpleMeta("json", bic);
+ meta.setRemoveSourceField(true);
+
+ JsonNode node = getBasicTestJsonNode();
+ JsonInput jsonInput = createJsonInputWithJsonNode("json", meta, new
Object[] {node});
+
+ RowComparatorListener rowComparator =
+ new RowComparatorListener(new Object[]
{"{\"color\":\"red\",\"price\":19.95}"});
+ jsonInput.addRowListener(rowComparator);
+
+ processRows(jsonInput, 2);
+ assertEquals(0, jsonInput.getErrors(), out.toString());
+ assertEquals(1, jsonInput.getLinesWritten(), "rows written");
+ }
+
@Test
void testBicycleAsterisk() throws Exception {
ByteArrayOutputStream out = new ByteArrayOutputStream();
@@ -1142,7 +1222,7 @@ class JsonInputTest {
}
@Test
- void testJsonInputPathResolutionSuccess() {
+ void testJsonInputPathResolutionSuccess() throws Exception {
JsonInputField inputField = new JsonInputField("value");
final String PATH = "${PARAM_PATH}.price";
inputField.setPath(PATH);
@@ -1242,7 +1322,8 @@ class JsonInputTest {
}
protected JsonInput createBasicTestJsonInput(
- String jsonPath, IValueMeta outputMeta, final String inCol, Object[]...
inputRows) {
+ String jsonPath, IValueMeta outputMeta, final String inCol, Object[]...
inputRows)
+ throws HopPluginException {
JsonInputField jpath = new JsonInputField(outputMeta.getName());
jpath.setPath(jsonPath);
jpath.setType(outputMeta.getType());
@@ -1251,17 +1332,29 @@ class JsonInputTest {
return createJsonInput(inCol, meta, inputRows);
}
- protected JsonInput createJsonInput(
- final String inCol, JsonInputMeta meta, Object[]... inputRows) {
+ protected JsonInput createJsonInput(final String inCol, JsonInputMeta meta,
Object[]... inputRows)
+ throws HopPluginException {
return createJsonInput(inCol, meta, null, inputRows);
}
protected JsonInput createJsonInput(
- final String inCol, JsonInputMeta meta, IVariables variables,
Object[]... inputRows) {
+ final String inCol, JsonInputMeta meta, IVariables variables,
Object[]... inputRows)
+ throws HopPluginException {
+ return createJsonInput(inCol, IValueMeta.TYPE_STRING, meta, variables,
inputRows);
+ }
+
+ protected JsonInput createJsonInput(
+ final String inCol,
+ int hopType,
+ JsonInputMeta meta,
+ IVariables variables,
+ Object[]... inputRows)
+ throws HopPluginException {
JsonInputData data = new JsonInputData();
IRowSet input = helper.getMockInputRowSet(inputRows);
- IRowMeta rowMeta = createRowMeta(new ValueMetaString(inCol));
+ IRowMeta rowMeta = new RowMeta();
+ rowMeta.addValueMeta(ValueMetaFactory.createValueMeta(inCol, hopType));
input.setRowMeta(rowMeta);
JsonInput jsonInput =
@@ -1275,6 +1368,20 @@ class JsonInputTest {
return jsonInput;
}
+ protected static JsonNode getBasicTestJsonNode() throws Exception {
+ String json = getBasicTestJson();
+ return new ObjectMapper().readTree(json);
+ }
+
+ protected JsonInput createJsonInputWithJsonNode(
+ String fieldName, JsonInputMeta meta, Object[]... inputRows) throws
HopPluginException {
+
+ meta.setInFields(true);
+ meta.setFieldValue(fieldName);
+
+ return createJsonInput(fieldName, IValueMeta.TYPE_JSON, meta, null,
inputRows);
+ }
+
protected static class RowComparatorListener extends RowAdapter {
Object[][] data;
diff --git
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/mongo/wrapper/field/MongoField.java
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/mongo/wrapper/field/MongoField.java
index 7d822a4884..69312c435a 100644
---
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/mongo/wrapper/field/MongoField.java
+++
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/mongo/wrapper/field/MongoField.java
@@ -17,6 +17,7 @@
package org.apache.hop.mongo.wrapper.field;
+import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import java.math.BigDecimal;
@@ -32,7 +33,9 @@ import org.apache.hop.core.variables.IVariables;
import org.apache.hop.i18n.BaseMessages;
import org.apache.hop.pipeline.transforms.mongodbinput.MongoDbInputData;
import org.bson.BsonUndefined;
+import org.bson.Document;
import org.bson.types.Binary;
+import org.bson.types.Decimal128;
public class MongoField implements Comparable<MongoField> {
protected static final Class<?> PKG = MongoField.class;
@@ -236,6 +239,11 @@ public class MongoField implements Comparable<MongoField> {
return tempValueMeta.getNumber(fieldValue);
case IValueMeta.TYPE_STRING:
return tempValueMeta.getString(fieldValue);
+ case IValueMeta.TYPE_JSON:
+ // Jackson JsonNode handling:
+ // Supports JSON values (and binary type 0), BSON objects like
Date/UUID
+ // are not supported since they're not JSON values
+ return tempValueMeta.getJson(fieldValue);
default:
// UUID support
try {
@@ -415,4 +423,41 @@ public class MongoField implements Comparable<MongoField> {
public void setIndexedVals(String vals) {
indexedValues = MongoDbInputData.indexedValsList(vals);
}
+
+ /** Converts a JsonNode in a Document object */
+ public static Object toBsonFromJsonNode(JsonNode n) {
+ if (n == null) return null;
+
+ switch (n.getNodeType()) {
+ case OBJECT:
+ Document d = new Document();
+ // for each entry in JsonNode, create an entry in Document
+ n.fields().forEachRemaining(e -> d.put(e.getKey(),
toBsonFromJsonNode(e.getValue())));
+ return d;
+ case ARRAY:
+ var list = new java.util.ArrayList<>(n.size());
+ n.forEach(el -> list.add(toBsonFromJsonNode(el)));
+ return list;
+ case STRING:
+ return n.textValue();
+ case BOOLEAN:
+ return n.booleanValue();
+ case NUMBER:
+ if (n.isIntegralNumber()) {
+ long v = n.longValue();
+ return (v >= Integer.MIN_VALUE && v <= Integer.MAX_VALUE) ? (int) v
: v;
+ }
+ if (n.isBigDecimal()) return
Decimal128.parse(n.decimalValue().toPlainString());
+ return n.doubleValue();
+ case BINARY:
+ try {
+ return new Binary(n.binaryValue());
+ } catch (Exception ignore) {
+ // fall through
+ }
+ // fallback, string representation
+ default:
+ return n.asText();
+ }
+ }
}
diff --git
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodbdelete/MongoDbDeleteData.java
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodbdelete/MongoDbDeleteData.java
index ecba113da9..b9ef927340 100644
---
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodbdelete/MongoDbDeleteData.java
+++
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodbdelete/MongoDbDeleteData.java
@@ -17,6 +17,7 @@
package org.apache.hop.pipeline.transforms.mongodbdelete;
+import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
import java.util.ArrayList;
@@ -35,6 +36,7 @@ import org.apache.hop.mongo.metadata.MongoDbConnection;
import org.apache.hop.mongo.wrapper.MongoClientWrapper;
import org.apache.hop.mongo.wrapper.collection.MongoCollectionWrapper;
import org.apache.hop.mongo.wrapper.cursor.MongoCursorWrapper;
+import org.apache.hop.mongo.wrapper.field.MongoField;
import org.apache.hop.pipeline.transform.BaseTransformData;
import org.apache.hop.pipeline.transform.ITransformData;
@@ -313,59 +315,80 @@ public class MongoDbDeleteData extends BaseTransformData
implements ITransformDa
return false; // don't insert nulls!
}
- if (valueMeta.isString()) {
- String val = valueMeta.getString(objectValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (valueMeta.isBoolean()) {
- Boolean val = valueMeta.getBoolean(objectValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (valueMeta.isInteger()) {
- Long val = valueMeta.getInteger(objectValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (valueMeta.isDate()) {
- Date val = valueMeta.getDate(objectValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (valueMeta.isNumber()) {
- Double val = valueMeta.getNumber(objectValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (valueMeta.isBigNumber()) {
- // use string value - user can use Hop to convert back
- String val = valueMeta.getString(objectValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (valueMeta.isBinary()) {
- byte[] val = valueMeta.getBinary(objectValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- // UUID
- try {
- int uuidTypeId = ValueMetaFactory.getIdForValueMeta("UUID");
- if (valueMeta.getType() == uuidTypeId) {
- UUID val = (UUID) valueMeta.convertData(valueMeta, objectValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- } catch (Exception ignore) {
- // UUID plugin not present, fall through
- }
- if (valueMeta.isSerializableType()) {
- throw new HopValueException(
- BaseMessages.getString(PKG,
"MongoDbDelete.ErrorMessage.CantStoreHopSerializableVals"));
+ switch (valueMeta.getType()) {
+ case IValueMeta.TYPE_STRING:
+ {
+ String val = valueMeta.getString(objectValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_BOOLEAN:
+ {
+ Boolean val = valueMeta.getBoolean(objectValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_INTEGER:
+ {
+ Long val = valueMeta.getInteger(objectValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_DATE:
+ {
+ Date val = valueMeta.getDate(objectValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_NUMBER:
+ {
+ Double val = valueMeta.getNumber(objectValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_BIGNUMBER:
+ {
+ // use string value - user can use Hop to convert back
+ String val = valueMeta.getString(objectValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_BINARY:
+ {
+ byte[] val = valueMeta.getBinary(objectValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_SERIALIZABLE:
+ {
+ throw new HopValueException(
+ BaseMessages.getString(
+ PKG,
"MongoDbDelete.ErrorMessage.CantStoreHopSerializableVals"));
+ }
+ case IValueMeta.TYPE_JSON:
+ {
+ JsonNode node = valueMeta.getJson(objectValue);
+ Object bson = MongoField.toBsonFromJsonNode(node);
+ mongoObject.put(lookup.toString(), bson);
+ return true;
+ }
+ default:
+ {
+ // UUID
+ try {
+ int uuidTypeId = ValueMetaFactory.getIdForValueMeta("UUID");
+ if (valueMeta.getType() == uuidTypeId) {
+ UUID val = (UUID) valueMeta.convertData(valueMeta, objectValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ } catch (Exception ignore) {
+ // UUID plugin not present, fall through
+ }
+
+ return false;
+ }
}
-
- return false;
}
/**
diff --git
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodbdelete/MongoDbDeleteDialog.java
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodbdelete/MongoDbDeleteDialog.java
index e7a053428c..6380544fb6 100644
---
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodbdelete/MongoDbDeleteDialog.java
+++
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodbdelete/MongoDbDeleteDialog.java
@@ -870,6 +870,9 @@ public class MongoDbDeleteDialog extends
BaseTransformDialog {
case IValueMeta.TYPE_BINARY:
val = "<binary val>";
break;
+ case IValueMeta.TYPE_JSON:
+ val = "<JSON document>";
+ break;
default:
try {
int uuidTypeId = ValueMetaFactory.getIdForValueMeta("UUID");
diff --git
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodboutput/MongoDbOutputData.java
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodboutput/MongoDbOutputData.java
index be7ead860e..7c11454856 100644
---
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodboutput/MongoDbOutputData.java
+++
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodboutput/MongoDbOutputData.java
@@ -17,6 +17,7 @@
package org.apache.hop.pipeline.transforms.mongodboutput;
+import com.fasterxml.jackson.databind.JsonNode;
import com.mongodb.BasicDBList;
import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;
@@ -44,6 +45,7 @@ import org.apache.hop.mongo.metadata.MongoDbConnection;
import org.apache.hop.mongo.wrapper.MongoClientWrapper;
import org.apache.hop.mongo.wrapper.collection.MongoCollectionWrapper;
import org.apache.hop.mongo.wrapper.cursor.MongoCursorWrapper;
+import org.apache.hop.mongo.wrapper.field.MongoField;
import org.apache.hop.pipeline.transform.BaseTransformData;
import org.apache.hop.pipeline.transform.ITransformData;
@@ -710,6 +712,8 @@ public class MongoDbOutputData extends BaseTransformData
implements ITransformDa
IValueMeta vm = inputMeta.getValueMeta(index);
if (!vm.isNull(row[index])) {
String jsonDoc = vm.getString(row[index]);
+ // TODO: change JSON.parse into Document.parse and adjust all the
returned
+ // types since the mongoObject.put() accepts a Document and JSON
is deprecated
return (DBObject) JSON.parse(jsonDoc);
} else {
return null;
@@ -882,65 +886,84 @@ public class MongoDbOutputData extends BaseTransformData
implements ITransformDa
}
}
- if (hopType.isString()) {
- String val = hopType.getString(hopValue);
- if (hopValueIsJSON) {
- Object mongoO = JSON.parse(val);
- mongoObject.put(lookup.toString(), mongoO);
- } else {
- mongoObject.put(lookup.toString(), val);
- }
- return true;
- }
- if (hopType.isBoolean()) {
- Boolean val = hopType.getBoolean(hopValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (hopType.isInteger()) {
- Long val = hopType.getInteger(hopValue);
- mongoObject.put(lookup.toString(), val.longValue());
- return true;
- }
- if (hopType.isDate()) {
- Date val = hopType.getDate(hopValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (hopType.isNumber()) {
- Double val = hopType.getNumber(hopValue);
- mongoObject.put(lookup.toString(), val.doubleValue());
- return true;
- }
- if (hopType.isBigNumber()) {
- // use string value - user can use Hop to convert back
- String val = hopType.getString(hopValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- if (hopType.isBinary()) {
- byte[] val = hopType.getBinary(hopValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- // UUID
- try {
- int uuidTypeId = ValueMetaFactory.getIdForValueMeta("UUID");
- if (hopType.getType() == uuidTypeId) {
- UUID val = (UUID) hopType.convertData(hopType, hopValue);
- mongoObject.put(lookup.toString(), val);
- return true;
- }
- } catch (Exception ignore) {
- // UUID plugin not present, fall through
- }
- if (hopType.isSerializableType()) {
- throw new HopValueException(
- BaseMessages.getString(
- PKG,
"MongoDbOutput.Messages.Error.CantStoreHopSerializableVals")); //
- }
+ switch (hopType.getType()) {
+ case IValueMeta.TYPE_STRING:
+ {
+ String val = hopType.getString(hopValue);
+ if (hopValueIsJSON) {
+ Object mongoO = JSON.parse(val);
+ mongoObject.put(lookup.toString(), mongoO);
+ } else {
+ mongoObject.put(lookup.toString(), val);
+ }
+ return true;
+ }
+ case IValueMeta.TYPE_BOOLEAN:
+ {
+ Boolean val = hopType.getBoolean(hopValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_INTEGER:
+ {
+ Long val = hopType.getInteger(hopValue);
+ mongoObject.put(lookup.toString(), val.longValue());
+ return true;
+ }
+ case IValueMeta.TYPE_DATE:
+ {
+ Date val = hopType.getDate(hopValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_NUMBER:
+ {
+ Double val = hopType.getNumber(hopValue);
+ mongoObject.put(lookup.toString(), val.doubleValue());
+ return true;
+ }
+ case IValueMeta.TYPE_BIGNUMBER:
+ {
+ // use string value - user can use Hop to convert back
+ String val = hopType.getString(hopValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_BINARY:
+ {
+ byte[] val = hopType.getBinary(hopValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ case IValueMeta.TYPE_JSON:
+ {
+ JsonNode node = hopType.getJson(hopValue);
+ Object bson = MongoField.toBsonFromJsonNode(node);
+ mongoObject.put(lookup.toString(), bson);
+ return true;
+ }
+ default:
+ {
+ // UUID
+ try {
+ int uuidTypeId = ValueMetaFactory.getIdForValueMeta("UUID");
+ if (hopType.getType() == uuidTypeId) {
+ UUID val = (UUID) hopType.convertData(hopType, hopValue);
+ mongoObject.put(lookup.toString(), val);
+ return true;
+ }
+ } catch (Exception ignore) {
+ // UUID plugin not present, fall through
+ }
+ if (hopType.isSerializableType()) {
+ throw new HopValueException(
+ BaseMessages.getString(
+ PKG,
"MongoDbOutput.Messages.Error.CantStoreHopSerializableVals")); //
+ }
- return false;
+ return false;
+ }
+ }
}
private static Object getPathElementName(
diff --git
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodboutput/MongoDbOutputDialog.java
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodboutput/MongoDbOutputDialog.java
index 570d08242f..f54a83365a 100644
---
a/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodboutput/MongoDbOutputDialog.java
+++
b/plugins/transforms/mongodb/src/main/java/org/apache/hop/pipeline/transforms/mongodboutput/MongoDbOutputDialog.java
@@ -1082,6 +1082,9 @@ public class MongoDbOutputDialog extends
BaseTransformDialog {
case IValueMeta.TYPE_BINARY:
val = "<binary val>"; //
break;
+ case IValueMeta.TYPE_JSON:
+ val = "<JSON document>";
+ break;
default:
try {
int uuidTypeId = ValueMetaFactory.getIdForValueMeta("UUID");
diff --git
a/plugins/valuetypes/uuid/src/main/java/org/apache/hop/uuid/ValueMetaUuid.java
b/plugins/valuetypes/uuid/src/main/java/org/apache/hop/uuid/ValueMetaUuid.java
index f97531b38a..b7cd9824a1 100644
---
a/plugins/valuetypes/uuid/src/main/java/org/apache/hop/uuid/ValueMetaUuid.java
+++
b/plugins/valuetypes/uuid/src/main/java/org/apache/hop/uuid/ValueMetaUuid.java
@@ -141,39 +141,6 @@ public class ValueMetaUuid extends ValueMetaBase {
}
@Override
- public int compare(Object data1, Object data2) throws HopValueException {
- boolean n1 = isNull(data1);
- boolean n2 = isNull(data2);
-
- if (n1 && !n2) {
- if (isSortedDescending()) {
- return 1;
- } else {
- return -1;
- }
- }
- if (!n1 && n2) {
- if (isSortedDescending()) {
- return -1;
- } else {
- return 1;
- }
- }
- if (n1 && n2) {
- return 0;
- }
-
- int cmp = 0;
-
- cmp = typeCompare(data1, data2);
-
- if (isSortedDescending()) {
- return -cmp;
- } else {
- return cmp;
- }
- }
-
protected int typeCompare(Object object1, Object object2) throws
HopValueException {
if (object1 instanceof UUID u1 && object2 instanceof UUID u2) {
return u1.compareTo(u2);
@@ -213,7 +180,9 @@ public class ValueMetaUuid extends ValueMetaBase {
// generic fallback to String
preparedStatement.setString(index, u.toString());
} catch (Exception e) {
- throw new HopDatabaseException("Unable to set UUID parameter", e);
+ throw new HopDatabaseException(
+ "Error setting UUID value #" + index + " [" + toStringMeta() + "] on
prepared statement",
+ e);
}
}
@@ -225,7 +194,10 @@ public class ValueMetaUuid extends ValueMetaBase {
return convertData(this, o);
} catch (SQLException e) {
throw new HopDatabaseException(
- "Unable to get value '" + toStringMeta() + "' from database
resultset, index " + index,
+ "Unable to get UUID value '"
+ + toStringMeta()
+ + "' from database resultset, index "
+ + index,
e);
} catch (Exception e) {
throw new HopDatabaseException("Unable to read UUID value", e);