dzamo commented on a change in pull request #2327:
URL: https://github.com/apache/drill/pull/2327#discussion_r724922193



##########
File path: 
contrib/storage-jdbc/src/main/java/org/apache/drill/exec/store/jdbc/JdbcRecordWriter.java
##########
@@ -0,0 +1,859 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.drill.exec.store.jdbc;
+
+import org.apache.calcite.sql.SqlDialect;
+import org.apache.drill.common.AutoCloseables;
+import org.apache.drill.common.exceptions.UserException;
+import org.apache.drill.common.types.TypeProtos.DataMode;
+import org.apache.drill.common.types.TypeProtos.MinorType;
+import org.apache.drill.exec.expr.fn.impl.StringFunctionHelpers;
+import org.apache.drill.exec.expr.holders.BigIntHolder;
+import org.apache.drill.exec.expr.holders.BitHolder;
+import org.apache.drill.exec.expr.holders.DateHolder;
+import org.apache.drill.exec.expr.holders.Float4Holder;
+import org.apache.drill.exec.expr.holders.Float8Holder;
+import org.apache.drill.exec.expr.holders.IntHolder;
+import org.apache.drill.exec.expr.holders.NullableBigIntHolder;
+import org.apache.drill.exec.expr.holders.NullableBitHolder;
+import org.apache.drill.exec.expr.holders.NullableDateHolder;
+import org.apache.drill.exec.expr.holders.NullableFloat4Holder;
+import org.apache.drill.exec.expr.holders.NullableFloat8Holder;
+import org.apache.drill.exec.expr.holders.NullableIntHolder;
+import org.apache.drill.exec.expr.holders.NullableSmallIntHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeHolder;
+import org.apache.drill.exec.expr.holders.NullableTimeStampHolder;
+import org.apache.drill.exec.expr.holders.NullableTinyIntHolder;
+import org.apache.drill.exec.expr.holders.NullableVarCharHolder;
+import org.apache.drill.exec.expr.holders.NullableVarDecimalHolder;
+import org.apache.drill.exec.expr.holders.SmallIntHolder;
+import org.apache.drill.exec.expr.holders.TimeHolder;
+import org.apache.drill.exec.expr.holders.TimeStampHolder;
+import org.apache.drill.exec.expr.holders.TinyIntHolder;
+import org.apache.drill.exec.expr.holders.VarCharHolder;
+import org.apache.drill.exec.expr.holders.VarDecimalHolder;
+import org.apache.drill.exec.ops.OperatorContext;
+import org.apache.drill.exec.record.BatchSchema;
+import org.apache.drill.exec.record.MaterializedField;
+import org.apache.drill.exec.record.VectorAccessible;
+import org.apache.drill.exec.store.AbstractRecordWriter;
+import org.apache.drill.exec.store.EventBasedRecordWriter.FieldConverter;
+import org.apache.drill.exec.store.jdbc.utils.JdbcDDLQueryUtils;
+import org.apache.drill.exec.store.jdbc.utils.JdbcQueryBuilder;
+import org.apache.drill.exec.util.DecimalUtility;
+import org.apache.drill.exec.vector.complex.reader.FieldReader;
+import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.text.Format;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class JdbcRecordWriter extends AbstractRecordWriter {
+
+  private static final Logger logger = 
LoggerFactory.getLogger(JdbcRecordWriter.class);
+  public static final ImmutableMap<MinorType, Integer> JDBC_TYPE_MAPPINGS;
+
+  private static final String INSERT_QUERY_TEMPLATE = "INSERT INTO %s 
VALUES\n%s";
+  private final String tableName;
+  private final Connection connection;
+  private final JdbcWriter config;
+  private final SqlDialect dialect;
+  private final List<Object> rowList;
+  private final List<String> insertRows;
+  private final List<JdbcWriterField> fields;
+  private StringBuilder rowString;
+
+  /*
+   * This map maps JDBC data types to their Drill equivalents.  The basic 
strategy is that if there
+   * is a Drill equivalent, then do the mapping as expected.
+   *
+   * All flavors of character fields are mapped to VARCHAR in Drill. All 
versions of binary fields are
+   * mapped to VARBINARY.
+   */
+  static {
+    JDBC_TYPE_MAPPINGS = ImmutableMap.<MinorType, Integer>builder()
+      .put(MinorType.FLOAT8, java.sql.Types.NUMERIC)
+      .put(MinorType.FLOAT4, java.sql.Types.NUMERIC)
+      .put(MinorType.TINYINT, java.sql.Types.TINYINT)
+      .put(MinorType.SMALLINT, java.sql.Types.SMALLINT)
+      .put(MinorType.INT, java.sql.Types.INTEGER)
+      .put(MinorType.BIGINT, java.sql.Types.BIGINT)
+      .put(MinorType.VARCHAR, java.sql.Types.VARCHAR)
+      .put(MinorType.VARBINARY, java.sql.Types.VARBINARY)
+      .put(MinorType.VARDECIMAL, java.sql.Types.DECIMAL)
+      .put(MinorType.DATE, java.sql.Types.DATE)
+      .put(MinorType.TIME, java.sql.Types.TIME)
+      .put(MinorType.TIMESTAMP, java.sql.Types.TIMESTAMP)
+      .put(MinorType.BIT, java.sql.Types.BOOLEAN)
+      .build();
+  }
+
+  public JdbcRecordWriter(DataSource source, OperatorContext context, String 
name, JdbcWriter config) {
+    this.tableName = name;
+    this.config = config;
+    rowList = new ArrayList<>();
+    insertRows = new ArrayList<>();
+    this.dialect = config.getPlugin().getDialect();
+
+    this.fields = new ArrayList<>();
+
+    try {
+      this.connection = source.getConnection();
+    } catch (SQLException e) {
+      throw UserException.connectionError()
+        .message("Unable to open JDBC connection for writing.")
+        .addContext(e.getSQLState())
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void init(Map<String, String> writerOptions) {
+
+  }
+
+  @Override
+  public void updateSchema(VectorAccessible batch) {
+    BatchSchema schema = batch.getSchema();
+    String columnName;
+    MinorType type;
+    String sql;
+    Statement statement;
+    boolean nullable = false;
+    JdbcQueryBuilder queryBuilder = new JdbcQueryBuilder(tableName, dialect);
+
+    for (MaterializedField field : schema) {
+      columnName = field.getName();
+      type = field.getType().getMinorType();
+      logger.debug("Adding column {} of type {}.", columnName, type);
+
+      if (field.getType().getMode() == DataMode.REPEATED) {
+        throw UserException.dataWriteError()
+          .message("Drill does not yet support writing arrays to JDBC. " + 
columnName + " is an array.")
+          .build(logger);
+      }
+
+      if (field.getType().getMode() == DataMode.OPTIONAL) {
+        nullable = true;
+      }
+
+      int precision = field.getPrecision();
+      int scale = field.getScale();
+
+      queryBuilder.addColumn(columnName, field.getType().getMinorType(), 
nullable, precision, scale);
+    }
+
+    sql = queryBuilder.getCreateTableQuery();
+    sql = JdbcDDLQueryUtils.cleanDDLQuery(sql, dialect);
+    logger.debug("Final query: {}", sql);
+
+    // Execute the query to build the schema
+    try {
+      statement = connection.createStatement();
+      logger.debug("Executing CREATE query: {}", sql);
+      statement.execute(sql);
+      statement.close();
+    } catch (SQLException e) {
+      throw UserException.dataReadError(e)
+        .message("The JDBC storage plugin failed while trying to create the 
schema. ")
+        .addContext("Sql", sql)
+        .build(logger);
+    }
+  }
+
+  @Override
+  public void startRecord() throws IOException {
+    rowString = new StringBuilder();
+    rowList.clear();
+    rowString.append("(");
+    logger.debug("Start record");
+  }
+
+  @Override
+  public void endRecord() throws IOException {
+    logger.debug("Ending record");
+
+    // Add values to rowString
+    for (int i = 0; i < rowList.size(); i++) {
+      if (i > 0) {
+        rowString.append(", ");
+      }
+
+      // Add null value to rowstring
+      if (rowList.get(i) instanceof String && ((String) 
rowList.get(i)).equalsIgnoreCase("null")) {
+        rowString.append("null");
+        continue;
+      }
+
+      JdbcWriterField currentField = fields.get(i);
+      if (currentField.getDataType() == MinorType.VARCHAR) {
+        String value = null;
+        // Get the string value
+        if (currentField.getMode() == DataMode.REQUIRED) {
+          VarCharHolder varCharHolder = (VarCharHolder) rowList.get(i);
+          value = 
StringFunctionHelpers.getStringFromVarCharHolder(varCharHolder);
+          // Escape any naughty characters
+          value = JdbcDDLQueryUtils.sqlEscapeString(value);
+        } else {
+          try {
+            NullableVarCharHolder nullableVarCharHolder = 
(NullableVarCharHolder) rowList.get(i);
+            value = 
StringFunctionHelpers.getStringFromVarCharHolder(nullableVarCharHolder);
+            value = JdbcDDLQueryUtils.sqlEscapeString(value);
+          } catch (ClassCastException e) {
+            logger.error("Unable to read field: {}",  rowList.get(i));
+          }
+        }
+
+        // Add to value string
+        rowString.append(value);
+        //rowString.append("'").append(value).append("'");
+      } else if (currentField.getDataType() == MinorType.DATE) {
+        String dateString = formatDateForInsertQuery((Long) rowList.get(i));
+        rowString.append("'").append(dateString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIME) {
+        String timeString = formatTimeForInsertQuery((Integer) rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else if (currentField.getDataType() == MinorType.TIMESTAMP) {
+        String timeString = formatTimeStampForInsertQuery((Long) 
rowList.get(i));
+        rowString.append("'").append(timeString).append("'");
+      } else {
+        rowString.append(rowList.get(i));
+      }
+    }
+
+    rowString.append(")");
+    rowList.clear();
+    insertRows.add(rowString.toString());
+    logger.debug("End record: {}", rowString.toString());
+  }
+
+  @Override
+  public void abort() throws IOException {
+    logger.debug("Abort insert.");
+  }
+
+  @Override
+  public void cleanup() throws IOException {
+    logger.debug("Cleanup record");
+    // Execute query
+    String insertQuery = buildInsertQuery();
+
+    try {
+      logger.debug("Executing insert query: {}", insertQuery);
+      Statement stmt = connection.createStatement();
+      stmt.execute(insertQuery);
+      logger.debug("Query complete");
+      // Close connection
+      AutoCloseables.closeSilently(stmt, connection);
+    } catch (SQLException e) {
+      logger.error("Error: {} ", e.getMessage());
+      throw new IOException();
+    }
+  }
+
+  private String buildInsertQuery() {

Review comment:
       I think that the maximum number of records DBMSes allow in a `VALUES` 
expression is commonly order 1e3 to 1e4.  If Drill batch sizes can exceed that 
we're going to have a problem.  A possible solution is to always partition into 
conservative insert batches of, say 500 records.  The `PreparedStatement` and 
`executeBatch` JDBC API usage in this answer 
https://stackoverflow.com/a/3786127/1153953 might help to keep things as 
efficient as possible.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to