[
https://issues.apache.org/jira/browse/DRILL-8005?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17426116#comment-17426116
]
ASF GitHub Bot commented on DRILL-8005:
---------------------------------------
dzamo commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r724060422
##########
File path:
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcQueryBuilder.java
##########
@@ -0,0 +1,127 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.store.jdbc.JdbcRecordWriter;
+import org.apache.parquet.Strings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.JDBCType;
+
+public class JdbcQueryBuilder {
+ private static final Logger logger =
LoggerFactory.getLogger(JdbcQueryBuilder.class);
+ public static final int DEFAULT_VARCHAR_PRECISION = 100;
+
+ private static final String CREATE_TABLE_QUERY = "CREATE TABLE %s (";
+ private final StringBuilder createTableQuery;
+ private SqlDialect dialect;
+ private StringBuilder columns;
+
+ public JdbcQueryBuilder(String tableName, SqlDialect dialect) {
+ if (Strings.isNullOrEmpty(tableName)) {
+ throw new UnsupportedOperationException("Table name cannot be empty");
+ }
+ this.dialect = dialect;
+ createTableQuery = new StringBuilder();
+ createTableQuery.append(String.format(CREATE_TABLE_QUERY, tableName));
+ columns = new StringBuilder();
+ }
+
+ /**
+ * Adds a column to the CREATE TABLE statement
+ * @param colName The column to be added to the table
+ * @param type The Drill MinorType of the column
+ * @param nullable If the column is nullable or not.
+ * @param precision The precision, or overall length of a column
+ * @param scale The scale, or number of digits after the decimal
+ */
+ public void addColumn(String colName, MinorType type, boolean nullable, int
precision, int scale) {
+ StringBuilder queryText = new StringBuilder();
+ String jdbcColType = "";
+ try {
+ jdbcColType =
JDBCType.valueOf(JdbcRecordWriter.JDBC_TYPE_MAPPINGS.get(type)).getName();
+ } catch (NullPointerException e) {
+ // JDBC Does not support writing complex fields to databases
+ throw UserException.dataWriteError()
+ .message("Drill does not support writing complex fields to JDBC data
sources.")
+ .addContext(colName + " is a complex type.")
+ .build(logger);
+ }
+
+ queryText.append(colName).append(" ").append(jdbcColType);
+
+ // Add precision or scale if applicable
+ if (jdbcColType.equals("VARCHAR")) {
+ int max_precision = Math.max(precision, DEFAULT_VARCHAR_PRECISION);
+ queryText.append("(").append(max_precision).append(")");
+ }
+
+ if (!nullable) {
+ queryText.append(" NOT NULL");
+ }
+
+ if (! Strings.isNullOrEmpty(columns.toString())) {
+ columns.append(",\n");
+ }
+
+ columns.append(queryText);
+ }
+
+ /**
+ * Generates the CREATE TABLE query.
+ * @return The create table query.
+ */
+ public String getCreateTableQuery() {
+ createTableQuery.append(columns);
+ createTableQuery.append("\n)");
+ return createTableQuery.toString();
+ }
+
+ @Override
+ public String toString() {
+ return getCreateTableQuery();
+ }
+
+ /**
+ * This function adds the appropriate catalog, schema and table for the FROM
clauses for INSERT queries
+ * @param table The table
+ * @param catalog The database catalog
+ * @param schema The database schema
+ * @return The table with catalog and schema added, if present
+ */
+ public static String buildCompleteTableName(String table, String catalog,
String schema) {
Review comment:
Quite often RDBMSes allow spaces, and possibly other tricky characters,
to be used in table names. Trouble is that they differ in how they want want
such identifiers enclosed in those cases e.g. `[spaced out table]` vs ```spaced
out table```. Do we want to raise an error or warning here if e.g. a regexp
sees some of these characters? Otherwise I think later `INSERT` statements
could break.
##########
File path:
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/CapitalizingJdbcSchema.java
##########
@@ -93,6 +110,65 @@ void setHolder(SchemaPlus plusOfThis) {
return inner.getTableNames();
}
+
+ @Override
+ public CreateTableEntry createNewTable(String tableName, List<String>
partitionColumns, StorageStrategy strategy) {
+ if (! plugin.getConfig().isWritable()) {
+ throw UserException
+ .dataWriteError()
+ .message(plugin.getName() + " is not writable.")
+ .build(logger);
+ }
+
+ return new CreateTableEntry() {
+
+ @Override
+ public Writer getWriter(PhysicalOperator child) throws IOException {
+ String tableWithSchema =
JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+ return new JdbcWriter(child, tableWithSchema, inner, plugin);
+ }
+
+ @Override
+ public List<String> getPartitionColumns() {
+ return Collections.emptyList();
+ }
+ };
+ }
+
+ @Override
+ public void dropTable(String tableName) {
+ String tableWithSchema =
JdbcQueryBuilder.buildCompleteTableName(tableName, catalog, schema);
+ String dropTableQuery = String.format("DROP TABLE %s", tableWithSchema);
+ dropTableQuery = JdbcDDLQueryUtils.cleanDDLQuery(dropTableQuery,
plugin.getDialect());
+
+ try {
+ Connection conn = inner.getDataSource().getConnection();
Review comment:
Do we need a `finally { conn.close() }` for this?
##########
File path:
contrib/storage-jdbc/src/test/java/org/apache/drill/exec/store/jdbc/TestJdbcPluginWithPostgres.java
##########
@@ -0,0 +1,316 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.drill.categories.JdbcStorageTest;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.physical.rowSet.DirectRowSet;
+import org.apache.drill.exec.physical.rowSet.RowSet;
+import org.apache.drill.exec.record.metadata.SchemaBuilder;
+import org.apache.drill.exec.record.metadata.TupleMetadata;
+import org.apache.drill.test.ClusterFixture;
+import org.apache.drill.test.ClusterTest;
+import org.apache.drill.test.rowSet.RowSetUtilities;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.testcontainers.containers.PostgreSQLContainer;
+import org.testcontainers.containers.JdbcDatabaseContainer;
+import org.testcontainers.utility.DockerImageName;
+
+import java.math.BigDecimal;
+import java.time.LocalDate;
+import java.util.TimeZone;
+
+import static org.junit.Assert.assertEquals;
+
+/**
+ * JDBC storage plugin tests against Postgres.
+ */
+@Category(JdbcStorageTest.class)
+public class TestJdbcPluginWithPostgres extends ClusterTest {
Review comment:
The contents of this class look like they're not writer-related and
should have been there before... was it just something we didn't have?
##########
File path:
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcCatalogSchema.java
##########
@@ -56,7 +56,7 @@
while (set.next()) {
final String catalogName = set.getString(1);
CapitalizingJdbcSchema schema = new CapitalizingJdbcSchema(
- getSchemaPath(), catalogName, source, dialect, convention,
catalogName, null, caseSensitive);
+ getSchemaPath(), catalogName, source, dialect, convention,
catalogName, null, caseSensitive, convention.getPlugin());
Review comment:
If CapitalizingJdbcSchema can only correctly use the JdbcStoragePlugin
returned by `convention.getPlugin()` then I'd consider not adding to the
constructor args here, but letting CapitalizingJdbcSchema do the lookup
`convention.getPlugin()` itself, in the constructor.
##########
File path:
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/utils/JdbcDDLQueryUtils.java
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc.utils;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.parser.SqlParseException;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.calcite.sql.validate.SqlConformanceEnum;
+import org.apache.calcite.sql.parser.ddl.SqlDdlParserImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class JdbcDDLQueryUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(JdbcDDLQueryUtils.class);
+ /**
+ * Converts a given SQL query from the generic dialect to the destination
system dialect. Returns
+ * null if the original query is not valid.
+ *
+ * @param query An ANSI SQL statement
+ * @param dialect The destination system dialect
+ * @return A representation of the original query in the destination dialect
+ */
+ public static String cleanDDLQuery(String query, SqlDialect dialect) {
+ SqlParser.Config sqlParserConfig = SqlParser.configBuilder()
+ .setParserFactory(SqlDdlParserImpl.FACTORY)
+ .setConformance(SqlConformanceEnum.MYSQL_5)
+ .setCaseSensitive(true)
+ .setLex(Lex.MYSQL_ANSI)
+ .build();
+
+ try {
+ SqlNode node = SqlParser.create(query, sqlParserConfig).parseQuery();
+ String cleanSQL = node.toSqlString(dialect).getSql();
+
+ // TODO Fix this hack
+ // HACK See CALCITE-4820
(https://issues.apache.org/jira/browse/CALCITE-4820)
Review comment:
Did you see the response from the Calcite team on CALCITE-4820?
##########
File path:
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+ private static final Logger logger =
LoggerFactory.getLogger(JdbcRecordWriter.class);
+ public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+ private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s
VALUES\n%s";
+ private final String tableName;
+ private final Connection connection;
+ private final JdbcWriter config;
+ private final SqlDialect dialect;
+ private final List<Object> rowList;
+ private final List<String> insertRows;
+ private final List<JdbcWriterField> fields;
+ private StringBuilder rowString;
+
+ /*
+ * This map maps JDBC data types to their Drill equivalents. The basic
strategy is that if there
+ * is a Drill equivalent, then do the mapping as expected.
+ *
+ * All flavors of character fields are mapped to VARCHAR in Drill. All
versions of binary fields are
+ * mapped to VARBINARY.
+ */
+ static {
+ JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+ .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+ .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+ .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+ .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+ .put(MinorType.INT, java.sql.Types.INTEGER)
+ .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+ .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+ .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+ .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+ .put(MinorType.DATE, java.sql.Types.DATE)
+ .put(MinorType.TIME, java.sql.Types.TIME)
+ .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+ .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+ .build();
+ }
+
+ public JdbcRecordWriter(DataSource source, OperatorContext context, String
name, JdbcWriter config) {
+ this.tableName = name;
+ this.config = config;
+ rowList = new ArrayList<>();
+ insertRows = new ArrayList<>();
+ this.dialect = config.getPlugin().getDialect();
+
+ this.fields = new ArrayList<>();
+
+ try {
+ this.connection = source.getConnection();
+ } catch (SQLException e) {
+ throw UserException.connectionError()
+ .message("Unable to open JDBC connection for writing.")
+ .addContext(e.getSQLState())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> writerOptions) {
+
+ }
+
+ @Override
+ public void updateSchema(VectorAccessible batch) {
+ BatchSchema schema = batch.getSchema();
+ String columnName;
+ MinorType type;
+ String sql;
+ Statement statement;
+ boolean nullable = false;
+ JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+ for (MaterializedField field : schema) {
+ columnName = field.getName();
+ type = field.getType().getMinorType();
+ logger.debug("Adding column {} of type {}.", columnName, type);
+
+ if (field.getType().getMode() == DataMode.REPEATED) {
+ throw UserException.dataWriteError()
+ .message("Drill does not yet support writing arrays to JDBC. " +
columnName + " is an array.")
+ .build(logger);
+ }
+
+ if (field.getType().getMode() == DataMode.OPTIONAL) {
+ nullable = true;
+ }
+
+ int precision = field.getPrecision();
+ int scale = field.getScale();
+
+ queryBuilder.addColumn(columnName, field.getType().getMinorType(),
nullable, precision, scale);
+ }
+
+ sql = queryBuilder.getCreateTableQuery();
+ sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+ logger.debug("Final query: {}", sql);
+
+ // Execute the query to build the schema
+ try {
+ statement = connection.createStatement();
+ logger.debug("Executing CREATE query: {}", sql);
+ statement.execute(sql);
+ statement.close();
+ } catch (SQLException e) {
+ throw UserException.dataReadError(e)
+ .message("The JDBC storage plugin failed while trying to create the
schema. ")
+ .addContext("Sql", sql)
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void startRecord() throws IOException {
+ rowString = new StringBuilder();
+ rowList.clear();
+ rowString.append("(");
+ logger.debug("Start record");
+ }
+
+ @Override
+ public void endRecord() throws IOException {
+ logger.debug("Ending record");
+
+ // Add values to rowString
+ for (int i = 0; i < rowList.size(); i++) {
+ if (i > 0) {
+ rowString.append(", ");
+ }
+
+ // Add null value to rowstring
+ if (rowList.get(i) instanceof String && ((String)
rowList.get(i)).equalsIgnoreCase("null")) {
+ rowString.append("null");
+ continue;
+ }
+
+ JdbcWriterField currentField = fields.get(i);
+ if (currentField.getDataType() == MinorType.VARCHAR) {
+ String value = null;
+ // Get the string value
+ if (currentField.getMode() == DataMode.REQUIRED) {
+ VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+ value =
StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+ // Escape any naughty characters
+ value = JdbcDDLQueryUtils.sqlEscapeString(value);
+ } else {
+ try {
+ NullableVarCharHolder nullableVarCharHolder =
(NullableVarCharHolder) rowList.get(i);
+ value =
StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+ value = JdbcDDLQueryUtils.sqlEscapeString(value);
+ } catch (ClassCastException e) {
+ logger.error("Unable to read field: {}", rowList.get(i));
+ }
+ }
+
+ // Add to value string
+ rowString.append(value);
+ //rowString.append("'").append(value).append("'");
+ } else if (currentField.getDataType() == MinorType.DATE) {
+ String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+ rowString.append("'").append(dateString).append("'");
+ } else if (currentField.getDataType() == MinorType.TIME) {
+ String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+ rowString.append("'").append(timeString).append("'");
+ } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+ String timeString = formatTimeStampForInsertQuery((Long)
rowList.get(i));
+ rowString.append("'").append(timeString).append("'");
+ } else {
+ rowString.append(rowList.get(i));
+ }
+ }
+
+ rowString.append(")");
+ rowList.clear();
+ insertRows.add(rowString.toString());
+ logger.debug("End record: {}", rowString.toString());
+ }
+
+ @Override
+ public void abort() throws IOException {
+ logger.debug("Abort insert.");
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ logger.debug("Cleanup record");
+ // Execute query
+ String insertQuery = buildInsertQuery();
+
+ try {
+ logger.debug("Executing insert query: {}", insertQuery);
+ Statement stmt = connection.createStatement();
+ stmt.execute(insertQuery);
+ logger.debug("Query complete");
+ // Close connection
+ AutoCloseables.closeSilently(stmt, connection);
+ } catch (SQLException e) {
+ logger.error("Error: {} ", e.getMessage());
+ throw new IOException();
+ }
+ }
+
+ private String buildInsertQuery() {
+ StringBuilder values = new StringBuilder();
+ for (int i = 0; i < insertRows.size(); i++) {
+ if (i > 0) {
+ values.append(",\n");
+ }
+ values.append(insertRows.get(i));
+ }
+
+ String sql = String.format(INSERT_QUERY_TEMPLATE, tableName, values);
+ return JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+ }
+
+ private String formatDateForInsertQuery(Long dateVal) {
+ Date date=new Date(dateVal);
+ SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd");
+ return df2.format(date);
+ }
+
+ private String formatTimeForInsertQuery(Integer millis) {
+ return String.format("%02d:%02d:%02d",
TimeUnit.MILLISECONDS.toHours(millis),
+ TimeUnit.MILLISECONDS.toMinutes(millis) % TimeUnit.HOURS.toMinutes(1),
+ TimeUnit.MILLISECONDS.toSeconds(millis) % TimeUnit.MINUTES.toSeconds(1));
+ }
+
+ private String formatTimeStampForInsertQuery(Long time) {
+ Date date = new Date(time);
+ Format format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ return format.format(date);
+ }
+
+ @Override
+ public FieldConverter getNewNullableIntConverter(int fieldId, String
fieldName, FieldReader reader) {
+ return new NullableIntJDBCConverter(fieldId, fieldName, reader, fields);
+ }
+
+ public class NullableIntJDBCConverter extends FieldConverter {
Review comment:
I guess maybe we could do something with a Freemarker template for the
converters but I'm not convinced it's worth it now that we already have these
written.
##########
File path:
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+ private static final Logger logger =
LoggerFactory.getLogger(JdbcRecordWriter.class);
+ public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+ private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s
VALUES\n%s";
+ private final String tableName;
+ private final Connection connection;
+ private final JdbcWriter config;
+ private final SqlDialect dialect;
+ private final List<Object> rowList;
+ private final List<String> insertRows;
+ private final List<JdbcWriterField> fields;
+ private StringBuilder rowString;
+
+ /*
+ * This map maps JDBC data types to their Drill equivalents. The basic
strategy is that if there
+ * is a Drill equivalent, then do the mapping as expected.
+ *
+ * All flavors of character fields are mapped to VARCHAR in Drill. All
versions of binary fields are
+ * mapped to VARBINARY.
+ */
+ static {
+ JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+ .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+ .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+ .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+ .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+ .put(MinorType.INT, java.sql.Types.INTEGER)
+ .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+ .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+ .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+ .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+ .put(MinorType.DATE, java.sql.Types.DATE)
+ .put(MinorType.TIME, java.sql.Types.TIME)
+ .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+ .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+ .build();
+ }
+
+ public JdbcRecordWriter(DataSource source, OperatorContext context, String
name, JdbcWriter config) {
+ this.tableName = name;
+ this.config = config;
+ rowList = new ArrayList<>();
+ insertRows = new ArrayList<>();
+ this.dialect = config.getPlugin().getDialect();
+
+ this.fields = new ArrayList<>();
+
+ try {
+ this.connection = source.getConnection();
+ } catch (SQLException e) {
+ throw UserException.connectionError()
+ .message("Unable to open JDBC connection for writing.")
+ .addContext(e.getSQLState())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> writerOptions) {
+
+ }
+
+ @Override
+ public void updateSchema(VectorAccessible batch) {
+ BatchSchema schema = batch.getSchema();
+ String columnName;
+ MinorType type;
+ String sql;
+ Statement statement;
+ boolean nullable = false;
+ JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+ for (MaterializedField field : schema) {
+ columnName = field.getName();
+ type = field.getType().getMinorType();
+ logger.debug("Adding column {} of type {}.", columnName, type);
+
+ if (field.getType().getMode() == DataMode.REPEATED) {
+ throw UserException.dataWriteError()
+ .message("Drill does not yet support writing arrays to JDBC. " +
columnName + " is an array.")
+ .build(logger);
+ }
+
+ if (field.getType().getMode() == DataMode.OPTIONAL) {
+ nullable = true;
+ }
+
+ int precision = field.getPrecision();
+ int scale = field.getScale();
+
+ queryBuilder.addColumn(columnName, field.getType().getMinorType(),
nullable, precision, scale);
+ }
+
+ sql = queryBuilder.getCreateTableQuery();
+ sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+ logger.debug("Final query: {}", sql);
+
+ // Execute the query to build the schema
+ try {
+ statement = connection.createStatement();
+ logger.debug("Executing CREATE query: {}", sql);
+ statement.execute(sql);
+ statement.close();
+ } catch (SQLException e) {
+ throw UserException.dataReadError(e)
+ .message("The JDBC storage plugin failed while trying to create the
schema. ")
+ .addContext("Sql", sql)
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void startRecord() throws IOException {
+ rowString = new StringBuilder();
+ rowList.clear();
+ rowString.append("(");
+ logger.debug("Start record");
+ }
+
+ @Override
+ public void endRecord() throws IOException {
+ logger.debug("Ending record");
+
+ // Add values to rowString
+ for (int i = 0; i < rowList.size(); i++) {
+ if (i > 0) {
+ rowString.append(", ");
+ }
+
+ // Add null value to rowstring
+ if (rowList.get(i) instanceof String && ((String)
rowList.get(i)).equalsIgnoreCase("null")) {
+ rowString.append("null");
+ continue;
+ }
+
+ JdbcWriterField currentField = fields.get(i);
+ if (currentField.getDataType() == MinorType.VARCHAR) {
+ String value = null;
+ // Get the string value
+ if (currentField.getMode() == DataMode.REQUIRED) {
+ VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+ value =
StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+ // Escape any naughty characters
+ value = JdbcDDLQueryUtils.sqlEscapeString(value);
+ } else {
+ try {
+ NullableVarCharHolder nullableVarCharHolder =
(NullableVarCharHolder) rowList.get(i);
+ value =
StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+ value = JdbcDDLQueryUtils.sqlEscapeString(value);
+ } catch (ClassCastException e) {
+ logger.error("Unable to read field: {}", rowList.get(i));
+ }
+ }
+
+ // Add to value string
+ rowString.append(value);
+ //rowString.append("'").append(value).append("'");
+ } else if (currentField.getDataType() == MinorType.DATE) {
+ String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+ rowString.append("'").append(dateString).append("'");
+ } else if (currentField.getDataType() == MinorType.TIME) {
+ String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+ rowString.append("'").append(timeString).append("'");
+ } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+ String timeString = formatTimeStampForInsertQuery((Long)
rowList.get(i));
+ rowString.append("'").append(timeString).append("'");
+ } else {
+ rowString.append(rowList.get(i));
+ }
+ }
+
+ rowString.append(")");
+ rowList.clear();
+ insertRows.add(rowString.toString());
+ logger.debug("End record: {}", rowString.toString());
+ }
+
+ @Override
+ public void abort() throws IOException {
+ logger.debug("Abort insert.");
+ }
+
+ @Override
+ public void cleanup() throws IOException {
Review comment:
It feels weird that we do the actual inserting in a method called
`cleanup` - is this the right place?
##########
File path:
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+ private static final Logger logger =
LoggerFactory.getLogger(JdbcRecordWriter.class);
+ public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+ private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s
VALUES\n%s";
+ private final String tableName;
+ private final Connection connection;
+ private final JdbcWriter config;
+ private final SqlDialect dialect;
+ private final List<Object> rowList;
+ private final List<String> insertRows;
+ private final List<JdbcWriterField> fields;
+ private StringBuilder rowString;
+
+ /*
+ * This map maps JDBC data types to their Drill equivalents. The basic
strategy is that if there
+ * is a Drill equivalent, then do the mapping as expected.
+ *
+ * All flavors of character fields are mapped to VARCHAR in Drill. All
versions of binary fields are
+ * mapped to VARBINARY.
+ */
+ static {
+ JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+ .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+ .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+ .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+ .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+ .put(MinorType.INT, java.sql.Types.INTEGER)
+ .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+ .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+ .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+ .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+ .put(MinorType.DATE, java.sql.Types.DATE)
+ .put(MinorType.TIME, java.sql.Types.TIME)
+ .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+ .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+ .build();
+ }
+
+ public JdbcRecordWriter(DataSource source, OperatorContext context, String
name, JdbcWriter config) {
+ this.tableName = name;
+ this.config = config;
+ rowList = new ArrayList<>();
+ insertRows = new ArrayList<>();
+ this.dialect = config.getPlugin().getDialect();
+
+ this.fields = new ArrayList<>();
+
+ try {
+ this.connection = source.getConnection();
+ } catch (SQLException e) {
+ throw UserException.connectionError()
+ .message("Unable to open JDBC connection for writing.")
+ .addContext(e.getSQLState())
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void init(Map<String, String> writerOptions) {
+
+ }
+
+ @Override
+ public void updateSchema(VectorAccessible batch) {
+ BatchSchema schema = batch.getSchema();
+ String columnName;
+ MinorType type;
+ String sql;
+ Statement statement;
+ boolean nullable = false;
+ JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+ for (MaterializedField field : schema) {
+ columnName = field.getName();
+ type = field.getType().getMinorType();
+ logger.debug("Adding column {} of type {}.", columnName, type);
+
+ if (field.getType().getMode() == DataMode.REPEATED) {
+ throw UserException.dataWriteError()
+ .message("Drill does not yet support writing arrays to JDBC. " +
columnName + " is an array.")
+ .build(logger);
+ }
+
+ if (field.getType().getMode() == DataMode.OPTIONAL) {
+ nullable = true;
+ }
+
+ int precision = field.getPrecision();
+ int scale = field.getScale();
+
+ queryBuilder.addColumn(columnName, field.getType().getMinorType(),
nullable, precision, scale);
+ }
+
+ sql = queryBuilder.getCreateTableQuery();
+ sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+ logger.debug("Final query: {}", sql);
+
+ // Execute the query to build the schema
+ try {
+ statement = connection.createStatement();
+ logger.debug("Executing CREATE query: {}", sql);
+ statement.execute(sql);
+ statement.close();
+ } catch (SQLException e) {
+ throw UserException.dataReadError(e)
+ .message("The JDBC storage plugin failed while trying to create the
schema. ")
+ .addContext("Sql", sql)
+ .build(logger);
+ }
+ }
+
+ @Override
+ public void startRecord() throws IOException {
+ rowString = new StringBuilder();
+ rowList.clear();
+ rowString.append("(");
+ logger.debug("Start record");
+ }
+
+ @Override
+ public void endRecord() throws IOException {
+ logger.debug("Ending record");
+
+ // Add values to rowString
+ for (int i = 0; i < rowList.size(); i++) {
+ if (i > 0) {
+ rowString.append(", ");
+ }
+
+ // Add null value to rowstring
+ if (rowList.get(i) instanceof String && ((String)
rowList.get(i)).equalsIgnoreCase("null")) {
+ rowString.append("null");
+ continue;
+ }
+
+ JdbcWriterField currentField = fields.get(i);
+ if (currentField.getDataType() == MinorType.VARCHAR) {
+ String value = null;
+ // Get the string value
+ if (currentField.getMode() == DataMode.REQUIRED) {
+ VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+ value =
StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+ // Escape any naughty characters
+ value = JdbcDDLQueryUtils.sqlEscapeString(value);
+ } else {
+ try {
+ NullableVarCharHolder nullableVarCharHolder =
(NullableVarCharHolder) rowList.get(i);
+ value =
StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+ value = JdbcDDLQueryUtils.sqlEscapeString(value);
+ } catch (ClassCastException e) {
+ logger.error("Unable to read field: {}", rowList.get(i));
+ }
+ }
+
+ // Add to value string
+ rowString.append(value);
+ //rowString.append("'").append(value).append("'");
+ } else if (currentField.getDataType() == MinorType.DATE) {
+ String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+ rowString.append("'").append(dateString).append("'");
+ } else if (currentField.getDataType() == MinorType.TIME) {
+ String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+ rowString.append("'").append(timeString).append("'");
+ } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+ String timeString = formatTimeStampForInsertQuery((Long)
rowList.get(i));
+ rowString.append("'").append(timeString).append("'");
+ } else {
+ rowString.append(rowList.get(i));
+ }
+ }
+
+ rowString.append(")");
+ rowList.clear();
+ insertRows.add(rowString.toString());
+ logger.debug("End record: {}", rowString.toString());
+ }
+
+ @Override
+ public void abort() throws IOException {
+ logger.debug("Abort insert.");
+ }
+
+ @Override
+ public void cleanup() throws IOException {
+ logger.debug("Cleanup record");
+ // Execute query
+ String insertQuery = buildInsertQuery();
+
+ try {
+ logger.debug("Executing insert query: {}", insertQuery);
+ Statement stmt = connection.createStatement();
+ stmt.execute(insertQuery);
+ logger.debug("Query complete");
+ // Close connection
+ AutoCloseables.closeSilently(stmt, connection);
+ } catch (SQLException e) {
+ logger.error("Error: {} ", e.getMessage());
+ throw new IOException();
+ }
+ }
+
+ private String buildInsertQuery() {
Review comment:
I think that the maximum number of records DBMSes allow in a `VALUES`
expression is order 1e3 to 1e4. If Drill batch sizes can exceed that we're
going to have a problem. A possible solution is to always partition into
conservative insert batches of, say 500 records. The `PreparedStatement` and
`executeBatch` JDBC API usage in this answer
https://stackoverflow.com/a/3786127/1153953 might help to keep things as
efficient as possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
> Add Writer to JDBC Storage Plugin
> ---------------------------------
>
> Key: DRILL-8005
> URL: https://issues.apache.org/jira/browse/DRILL-8005
> Project: Apache Drill
> Issue Type: Improvement
> Components: Storage - JDBC
> Affects Versions: 1.19.0
> Reporter: Charles Givre
> Assignee: Charles Givre
> Priority: Major
> Fix For: 1.20.0
>
>
> Current implementation of Drill only allows writing to file systems. This
> issue proposes extending the JDBC plugin to allow writing to JDBC data
> sources. This will do so by implementing:
> CREATE TABLE AS
> DROP TABLE
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)