Hisoka-X commented on code in PR #9103:
URL: https://github.com/apache/seatunnel/pull/9103#discussion_r2077285103


##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathUtils.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.util;
+
+import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.ReadContext;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utility class for JsonPath operations. */
+public class JsonPathUtils {
+
+    private static final Option[] DEFAULT_OPTIONS = {
+        Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, 
Option.DEFAULT_PATH_LEAF_TO_NULL
+    };
+
+    private static final Configuration JSON_CONFIGURATION =
+            Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
+
+    /**
+     * Creates a ReadContext from a JSON string.
+     *
+     * @param json The JSON string
+     * @return A ReadContext for the JSON
+     */
+    public static ReadContext parseJson(String json) {
+        return JsonPath.using(JSON_CONFIGURATION).parse(json);
+    }
+
+    /**
+     * Extracts data from JSON using JsonPaths.
+     *
+     * @param json The JSON string to parse
+     * @param jsonField The JsonField containing path mappings
+     * @param allowNullFields Whether to allow missing fields
+     * @return List of data extracted from JSON
+     */
+    public static List<Map<String, String>> extractData(
+            String json, JsonField jsonField, boolean allowNullFields) {
+        // Parse JSON into ReadContext
+        ReadContext jsonReadContext = parseJson(json);
+
+        // Convert JsonField to JsonPath array
+        JsonPath[] jsonPaths = createJsonPaths(jsonField);
+
+        // Get appropriate processor with jsonFiledMissedReturnNull flag
+        JsonPathProcessor processor =
+                JsonPathProcessorFactory.getProcessor(jsonPaths, 
allowNullFields);
+
+        // Process JSON data based on the processor
+        List<List<String>> results = 
processor.processJsonData(jsonReadContext, jsonPaths);
+
+        // Convert results to maps
+        return parseToMap(results, jsonField);
+    }
+
+    /**
+     * Creates JsonPath array from JsonField.
+     *
+     * @param jsonField The JsonField to convert
+     * @return Array of JsonPath objects
+     */
+    public static JsonPath[] createJsonPaths(JsonField jsonField) {
+        if (jsonField == null || jsonField.getFields() == null || 
jsonField.getFields().isEmpty()) {
+            throw new HttpConnectorException(
+                    HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
+                    "JsonField cannot be null or empty");
+        }
+
+        JsonPath[] jsonPaths = new JsonPath[jsonField.getFields().size()];
+        int index = 0;
+        for (String pathString : jsonField.getFields().values()) {
+            jsonPaths[index++] = JsonPath.compile(pathString);
+        }
+
+        return jsonPaths;
+    }
+
+    /**
+     * Converts parsed data to a list of maps.
+     *
+     * @param data The raw data (list of lists)
+     * @param jsonField The JsonField containing field names
+     * @return List of maps with field names as keys
+     */
+    public static List<Map<String, String>> parseToMap(
+            List<List<String>> data, JsonField jsonField) {
+        List<Map<String, String>> resultList = new ArrayList<>(data.size());
+        String[] keys = jsonField.getFields().keySet().toArray(new String[0]);
+
+        for (List<String> row : data) {
+            Map<String, String> resultMap = new 
HashMap<>(jsonField.getFields().size());
+            for (int i = 0; i < row.size(); i++) {
+                resultMap.put(keys[i], row.get(i));
+            }
+            resultList.add(resultMap);
+        }
+
+        return resultList;
+    }
+
+    /**
+     * Extracts the common parent path from JsonPaths.
+     *
+     * @param paths Array of JsonPath objects
+     * @return The common parent path
+     */
+    public static String extractCommonParentPath(JsonPath[] paths) {
+        return 
JsonPathProcessorFactory.getProcessor(paths).extractCommonParentPath(paths);
+    }
+
+    /**
+     * Gets a relative path from a parent path and a full path.
+     *
+     * @param parentPath The parent path
+     * @param fullPath The full path
+     * @return The relative path
+     */
+    public static String getRelativePath(String parentPath, String fullPath) {
+        return JsonPathProcessorFactory.getProcessor(fullPath)
+                .getRelativePath(parentPath, fullPath);
+    }

Review Comment:
   useless too?



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/JsonPathUtils.java:
##########
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.util;
+
+import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
+
+import com.jayway.jsonpath.Configuration;
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.Option;
+import com.jayway.jsonpath.ReadContext;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Utility class for JsonPath operations. */
+public class JsonPathUtils {
+
+    private static final Option[] DEFAULT_OPTIONS = {
+        Option.SUPPRESS_EXCEPTIONS, Option.ALWAYS_RETURN_LIST, 
Option.DEFAULT_PATH_LEAF_TO_NULL
+    };
+
+    private static final Configuration JSON_CONFIGURATION =
+            Configuration.defaultConfiguration().addOptions(DEFAULT_OPTIONS);
+
+    /**
+     * Creates a ReadContext from a JSON string.
+     *
+     * @param json The JSON string
+     * @return A ReadContext for the JSON
+     */
+    public static ReadContext parseJson(String json) {
+        return JsonPath.using(JSON_CONFIGURATION).parse(json);
+    }
+
+    /**
+     * Extracts data from JSON using JsonPaths.
+     *
+     * @param json The JSON string to parse
+     * @param jsonField The JsonField containing path mappings
+     * @param allowNullFields Whether to allow missing fields
+     * @return List of data extracted from JSON
+     */
+    public static List<Map<String, String>> extractData(

Review Comment:
   useless?



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/source/HttpSourceReader.java:
##########
@@ -467,8 +447,8 @@ private String getPartOfJson(String data) {
     }
 
     private List<List<String>> dataFlip(List<List<String>> results) {

Review Comment:
   useless now?



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/AbstractJsonPathProcessor.java:
##########
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.util;
+
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
+
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.ReadContext;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Abstract implementation of JsonPathProcessor providing common 
functionality. */
+public abstract class AbstractJsonPathProcessor implements JsonPathProcessor {
+
+    /** Flag to indicate whether to return null for missing fields */
+    private boolean jsonFiledMissedReturnNull = false;
+
+    /**
+     * Set whether to return null for missing fields.
+     *
+     * @param jsonFiledMissedReturnNull true to return null for missing 
fields, false otherwise
+     */
+    public void setJsonFiledMissedReturnNull(boolean 
jsonFiledMissedReturnNull) {
+        this.jsonFiledMissedReturnNull = jsonFiledMissedReturnNull;
+    }
+
+    /**
+     * Check if json fields with missing values should return null. This is 
used to determine
+     * whether to validate result consistency.
+     *
+     * @return true if missing fields should return null, false otherwise
+     */
+    protected boolean isJsonFiledMissedReturnNull() {
+        return jsonFiledMissedReturnNull;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public abstract String extractCommonParentPath(JsonPath[] paths);
+
+    /** {@inheritDoc} */
+    @Override
+    public abstract String getRelativePath(String parentPath, String fullPath);
+
+    /** {@inheritDoc} */
+    @Override
+    public abstract boolean canProcess(String pathString);
+
+    /** {@inheritDoc} */
+    @Override
+    public List<List<String>> processJsonData(ReadContext jsonReadContext, 
JsonPath[] paths) {
+        // Default implementation - can be overridden by subclasses
+        List<List<String>> results = new ArrayList<>(paths.length);
+
+        // Read all paths
+        for (JsonPath path : paths) {
+            results.add(jsonReadContext.read(path));
+        }
+
+        // Only validate consistency if jsonFiledMissedReturnNull is false
+        boolean shouldValidate = !isJsonFiledMissedReturnNull();
+        if (shouldValidate) {
+            validateResultsConsistency(results, paths);
+        }
+
+        return dataFlip(results);
+    }
+
+    /**
+     * Helper method to validate that all results have the same size.
+     *
+     * @param results The list of results to validate
+     * @param paths The JsonPath objects used to generate the results
+     * @throws HttpConnectorException if results are inconsistent
+     */
+    protected void validateResultsConsistency(List<List<String>> results, 
JsonPath[] paths) {
+        if (results.isEmpty()) {
+            return;
+        }
+
+        int expectedSize = results.get(0).size();
+        for (int i = 1; i < results.size(); i++) {
+            if (results.get(i).size() != expectedSize) {
+                throw new HttpConnectorException(
+                        HttpConnectorErrorCode.FIELD_DATA_IS_INCONSISTENT,
+                        String.format(
+                                "[%s](%d) and [%s](%d) the number of parsing 
records is inconsistent.",
+                                paths[0].getPath(),
+                                expectedSize,
+                                paths[i].getPath(),
+                                results.get(i).size()));
+            }
+        }
+    }
+
+    /**
+     * Flips a matrix of results so that rows become columns and vice versa.
+     *
+     * @param results The original data matrix
+     * @return The flipped data matrix
+     */
+    protected List<List<String>> dataFlip(List<List<String>> results) {
+        List<List<String>> datas = new ArrayList<>();
+
+        for (int i = 0; i < results.size(); i++) {
+            List<String> result = results.get(i);
+            if (i == 0) {
+                for (Object o : result) {
+                    String val = o == null ? null : o.toString();
+                    List<String> row = new ArrayList<>(results.size());
+                    row.add(val);
+                    datas.add(row);
+                }
+            } else {
+                for (int j = 0; j < result.size(); j++) {
+                    Object o = result.get(j);
+                    String val = o == null ? null : o.toString();
+                    List<String> row = datas.get(j);
+                    row.add(val);
+                }
+            }
+        }
+
+        return datas;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String extractValue(ReadContext objContext, String relativePath) {
+        try {
+            Object value = objContext.read(relativePath);
+            if (value == null) {
+                return null;
+            }
+            if (value instanceof List) {
+                List<?> list = (List<?>) value;
+                return !list.isEmpty() ? list.get(0).toString() : null;

Review Comment:
   I think return list string instead of return first element is a better 
choice.



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/http/util/ObjectJsonPathProcessor.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http.util;
+
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.http.exception.HttpConnectorException;
+
+import com.jayway.jsonpath.JsonPath;
+import com.jayway.jsonpath.ReadContext;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+/** Processor for handling JsonPath with dot notation (standard object 
notation). */
+public class ObjectJsonPathProcessor extends AbstractJsonPathProcessor {
+
+    /** {@inheritDoc} */
+    @Override
+    public boolean canProcess(String pathString) {
+        // Check if path uses dot notation and doesn't use array notation or 
bracket notation
+        return pathString.contains(".")
+                && !pathString.contains("[*]")
+                && !pathString.contains("['")
+                && !pathString.contains("[\"");
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public String extractCommonParentPath(JsonPath[] paths) {

Review Comment:
   useless too?



##########
seatunnel-connectors-v2/connector-http/connector-http-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/http/JsonFieldMissedReturnNullTest.java:
##########
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.http;
+
+import org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
+
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.BasicType;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import 
org.apache.seatunnel.connectors.seatunnel.http.client.HttpClientProvider;
+import org.apache.seatunnel.connectors.seatunnel.http.client.HttpResponse;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpParameter;
+import org.apache.seatunnel.connectors.seatunnel.http.config.HttpRequestMethod;
+import org.apache.seatunnel.connectors.seatunnel.http.config.JsonField;
+import org.apache.seatunnel.connectors.seatunnel.http.source.HttpSourceReader;
+import 
org.apache.seatunnel.connectors.seatunnel.http.source.SimpleTextDeserializationSchema;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Mock;
+import org.mockito.MockitoAnnotations;
+
+import java.lang.reflect.Field;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+public class JsonFieldMissedReturnNullTest {
+
+    private HttpParameter httpParameter;
+    private JsonField jsonField;
+    private SimpleTextDeserializationSchema deserializationSchema;
+
+    @Mock private SingleSplitReaderContext context;
+
+    @Mock private Collector<SeaTunnelRow> collector;
+
+    @Mock private HttpClientProvider httpClientProvider;
+
+    @Mock private HttpResponse httpResponse;
+
+    @BeforeEach
+    public void setUp() {
+        MockitoAnnotations.openMocks(this);
+
+        httpParameter = new HttpParameter();
+        httpParameter.setUrl("http://test-url.com";);
+        httpParameter.setMethod(HttpRequestMethod.GET);
+
+        Map<String, String> fields = new HashMap<>();
+        fields.put("key1", "$.result.key1");
+        fields.put("key2", "$.result2.key2.key2");
+        jsonField = JsonField.builder().fields(fields).build();
+
+        // Create the schema with two string fields
+        SeaTunnelRowType rowType =
+                new SeaTunnelRowType(
+                        new String[] {"key1", "key2"},
+                        new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
BasicType.STRING_TYPE});
+        deserializationSchema = new SimpleTextDeserializationSchema(rowType);
+
+        // Setup mocks
+        when(httpResponse.getCode()).thenReturn(200);
+        when(collector.getCheckpointLock()).thenReturn(new Object());
+    }
+
+    @Test
+    public void testJsonFieldMissedReturnNull() throws Exception {
+        // Test data with missing fields  Non-array, no common parent path
+        String testJsonData =
+                "{\n"
+                        + "  \"result\": {\n"
+                        + "    \"key1\": \"value1\"\n"
+                        + "  },\n"
+                        + "  \"result2\": {}\n"
+                        + "}";
+
+        // Set json_filed_missed_return_null to true
+        httpParameter.setJsonFiledMissedReturnNull(true);
+
+        // Setup HTTP response
+        when(httpResponse.getContent()).thenReturn(testJsonData);
+        when(httpClientProvider.execute(
+                        anyString(), anyString(), any(), any(), any(), 
any(Boolean.class)))
+                .thenReturn(httpResponse);
+
+        // Create HttpSourceReader
+        HttpSourceReader sourceReader =
+                new HttpSourceReader(
+                        httpParameter, context, deserializationSchema, 
jsonField, null);
+
+        // Use reflection to inject our mocked HTTP client
+        sourceReader.open(); // This creates the real HTTP client
+        Field httpClientField = 
HttpSourceReader.class.getDeclaredField("httpClient");
+        httpClientField.setAccessible(true);
+        httpClientField.set(sourceReader, httpClientProvider);

Review Comment:
   Let's use our reflection tool 
https://github.com/apache/seatunnel/blob/051d19c3a9b0b580532b5ef27ba01695ded2d048/seatunnel-common/src/main/java/org/apache/seatunnel/common/utils/ReflectionUtils.java#L79



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to