jihoonson commented on a change in pull request #9449:
URL: https://github.com/apache/druid/pull/9449#discussion_r421819452
##########
File path:
core/src/main/java/org/apache/druid/java/util/common/parsers/CloseableIterator.java
##########
@@ -66,10 +66,10 @@ public void close() throws IOException
return new CloseableIterator<R>()
{
- CloseableIterator<R> iterator = findNextIeteratorIfNecessary();
+ CloseableIterator<R> iterator = findNextIteratorIfNecessary();
Review comment:
👍
##########
File path: docs/ingestion/native-batch.md
##########
@@ -1310,6 +1311,43 @@ A spec that applies a filter and reads a subset of the
original datasource's col
This spec above will only return the `page`, `user` dimensions and `added`
metric.
Only rows where `page` = `Druid` will be returned.
+### Sql Input Source
+
+The SQL input source is used to read data directly from RDBMS.
+The SQL input source is _splittable_ and can be used by the [Parallel
task](#parallel-task), where each worker task will read from one SQL query from
the list of queries.
+Since this input source has a fixed input format for reading events, no
`inputFormat` field needs to be specified in the ingestion spec when using this
input source.
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|This should be "sql".|Yes|
+|database|Specifies the database connection details.|Yes|
+|foldCase|Toggle case folding of database column names. This may be enabled in
cases where the database returns case insensitive column names in query
results.|No|
+|sqls|List of SQL queries where each SQL query would retrieve the data to be
indexed.|Yes|
+
+An example SqlInputSource spec is shown below:
+
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "sql",
+ "database": {
+ "type": "mysql",
+ "connectorConfig": {
+ "connectURI": "jdbc:mysql://host:port/schema",
+ "user": "user",
+ "password": "password"
+ }
+ },
+ "sqls": ["SELECT * FROM table1", "SELECT * FROM table2"]
+ },
+...
+```
+
+The spec above will read all events from two separate sqls
+within the interval `2013-01-01/2013-01-02`.
Review comment:
Maybe worth mentioning one more time that these SQLs are executed in two
sub tasks when you run a Parallel task.
##########
File path: server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.input;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.prefetch.JsonIterator;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class SqlReader extends IntermediateRowParsingReader<Map<String,
Object>>
+{
+ private final InputRowSchema inputRowSchema;
+ private final SqlEntity source;
+ private final File temporaryDirectory;
+ private final ObjectMapper objectMapper;
+
+
+ SqlReader(
+ InputRowSchema inputRowSchema,
+ InputEntity source,
+ File temporaryDirectory,
+ ObjectMapper objectMapper
+ )
+ {
+ this.inputRowSchema = inputRowSchema;
+ this.source = (SqlEntity) source;
+ this.temporaryDirectory = temporaryDirectory;
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ protected CloseableIterator<Map<String, Object>> intermediateRowIterator()
throws IOException
+ {
+ final Closer closer = Closer.create();
+ final InputEntity.CleanableFile resultFile =
closer.register(source.fetch(temporaryDirectory, null));
+ FileInputStream inputStream = new FileInputStream(resultFile.file());
+ JsonIterator<Map<String, Object>> jsonIterator = new JsonIterator<>(new
TypeReference<Map<String, Object>>()
Review comment:
nit: this doesn't have to be done in this PR, but how about making
`JsonIterator` a `CloseableIterator`? It already implements `Iterator` and
`Closeable` so it would be pretty simple.
##########
File path: server/src/main/java/org/apache/druid/metadata/input/SqlReader.java
##########
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.druid.metadata.input;
+
+import com.fasterxml.jackson.core.type.TypeReference;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.druid.data.input.InputEntity;
+import org.apache.druid.data.input.InputRow;
+import org.apache.druid.data.input.InputRowSchema;
+import org.apache.druid.data.input.IntermediateRowParsingReader;
+import org.apache.druid.data.input.impl.MapInputRowParser;
+import org.apache.druid.data.input.impl.prefetch.JsonIterator;
+import org.apache.druid.java.util.common.io.Closer;
+import org.apache.druid.java.util.common.parsers.CloseableIterator;
+import org.apache.druid.java.util.common.parsers.ParseException;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+public class SqlReader extends IntermediateRowParsingReader<Map<String,
Object>>
+{
+ private final InputRowSchema inputRowSchema;
+ private final SqlEntity source;
+ private final File temporaryDirectory;
+ private final ObjectMapper objectMapper;
+
+
+ SqlReader(
+ InputRowSchema inputRowSchema,
+ InputEntity source,
+ File temporaryDirectory,
+ ObjectMapper objectMapper
+ )
+ {
+ this.inputRowSchema = inputRowSchema;
+ this.source = (SqlEntity) source;
+ this.temporaryDirectory = temporaryDirectory;
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ protected CloseableIterator<Map<String, Object>> intermediateRowIterator()
throws IOException
+ {
+ final Closer closer = Closer.create();
+ final InputEntity.CleanableFile resultFile =
closer.register(source.fetch(temporaryDirectory, null));
Review comment:
Could you add a comment on why we fetch all result in local storage
first? I remember this is to avoid holding database connections for too long
time. It would help other developers.
##########
File path: docs/ingestion/native-batch.md
##########
@@ -1310,6 +1311,43 @@ A spec that applies a filter and reads a subset of the
original datasource's col
This spec above will only return the `page`, `user` dimensions and `added`
metric.
Only rows where `page` = `Druid` will be returned.
+### Sql Input Source
+
+The SQL input source is used to read data directly from RDBMS.
+The SQL input source is _splittable_ and can be used by the [Parallel
task](#parallel-task), where each worker task will read from one SQL query from
the list of queries.
+Since this input source has a fixed input format for reading events, no
`inputFormat` field needs to be specified in the ingestion spec when using this
input source.
+
+|property|description|required?|
+|--------|-----------|---------|
+|type|This should be "sql".|Yes|
+|database|Specifies the database connection details.|Yes|
+|foldCase|Toggle case folding of database column names. This may be enabled in
cases where the database returns case insensitive column names in query
results.|No|
+|sqls|List of SQL queries where each SQL query would retrieve the data to be
indexed.|Yes|
+
+An example SqlInputSource spec is shown below:
+
+```json
+...
+ "ioConfig": {
+ "type": "index_parallel",
+ "inputSource": {
+ "type": "sql",
+ "database": {
+ "type": "mysql",
+ "connectorConfig": {
+ "connectURI": "jdbc:mysql://host:port/schema",
+ "user": "user",
+ "password": "password"
+ }
+ },
+ "sqls": ["SELECT * FROM table1", "SELECT * FROM table2"]
+ },
+...
+```
+
+The spec above will read all events from two separate sqls
Review comment:
`sqls` should be properly capitalized as `SQL`s.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]