Repository: incubator-gobblin Updated Branches: refs/heads/master 15f201937 -> a6dfdc6d4
[GOBBLIN-630] Add a concrete implementation for Postgres writer Closes #2512 from spotvenky/postgres_connector_2 Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/a6dfdc6d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/a6dfdc6d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/a6dfdc6d Branch: refs/heads/master Commit: a6dfdc6d477fe78b2e80aedcb6554306250b8a59 Parents: 15f2019 Author: Venkatesh Iyer <[email protected]> Authored: Tue Dec 4 09:11:56 2018 -0800 Committer: Hung Tran <[email protected]> Committed: Tue Dec 4 09:11:56 2018 -0800 ---------------------------------------------------------------------- .../org/apache/gobblin/writer/Destination.java | 3 +- .../commands/JdbcWriterCommandsFactory.java | 2 + .../commands/PostgresBufferedInserter.java | 65 +++++++ .../writer/commands/PostgresWriterCommands.java | 175 +++++++++++++++++++ .../writer/PostgresWriterCommandsTest.java | 135 ++++++++++++++ 5 files changed, 379 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java ---------------------------------------------------------------------- diff --git a/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java b/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java index 506d4ee..6c370b7 100644 --- a/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java +++ b/gobblin-api/src/main/java/org/apache/gobblin/writer/Destination.java @@ -35,7 +35,8 @@ public class Destination { HDFS, KAFKA, MYSQL, - TERADATA + TERADATA, + POSTGRES } // Type of destination http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java index aaf832f..c650249 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/JdbcWriterCommandsFactory.java @@ -43,6 +43,8 @@ public class JdbcWriterCommandsFactory { return new MySqlWriterCommands(destination.getProperties(), conn); case TERADATA: return new TeradataWriterCommands(destination.getProperties(), conn); + case POSTGRES: + return new PostgresWriterCommands(destination.getProperties(), conn); default: throw new IllegalArgumentException(destination.getType() + " is not supported"); } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java new file mode 100644 index 0000000..14f89f1 --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresBufferedInserter.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer.commands; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.converter.jdbc.JdbcEntryData; +import org.apache.gobblin.converter.jdbc.JdbcEntryDatum; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class PostgresBufferedInserter extends BaseJdbcBufferedInserter { + + public PostgresBufferedInserter(State state, Connection conn) { + super(state, conn); + } + + @Override + protected String createPrepareStatementStr(int batchSize) { + final String VALUE_FORMAT = "(%s)"; + StringBuilder sb = new StringBuilder(this.insertStmtPrefix); + String values = + String.format(VALUE_FORMAT, JOINER_ON_COMMA.useForNull("?").join(new String[this.columnNames.size()])); + sb.append(values); + for (int i = 1; i < batchSize; i++) { + sb.append(',').append(values); + } + + return sb.append(';').toString(); + } + + @Override + protected boolean insertBatch(PreparedStatement pstmt) + throws SQLException { + int i = 0; + pstmt.clearParameters(); + for (JdbcEntryData pendingEntry : PostgresBufferedInserter.this.pendingInserts) { + for (JdbcEntryDatum datum : pendingEntry) { + pstmt.setObject(++i, datum.getVal()); + } + } + log.debug("Executing SQL " + pstmt); + return pstmt.execute(); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java new file mode 100644 index 0000000..87e89c3 --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/writer/commands/PostgresWriterCommands.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer.commands; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.Map; +import java.util.Properties; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.converter.jdbc.JdbcEntryData; +import org.apache.gobblin.converter.jdbc.JdbcType; + +import com.google.common.collect.ImmutableMap; + +import lombok.extern.slf4j.Slf4j; + + +/** + * The implementation of JdbcWriterCommands for Postgres. + */ +@Slf4j +public class PostgresWriterCommands implements JdbcWriterCommands { + + private static final String CREATE_TABLE_SQL_FORMAT = "CREATE TABLE %s.%s (LIKE %s.%s)"; + private static final String SELECT_SQL_FORMAT = "SELECT COUNT(*) FROM %s.%s"; + private static final String TRUNCATE_TABLE_FORMAT = "TRUNCATE TABLE %s.%s"; + private static final String DROP_TABLE_SQL_FORMAT = "DROP TABLE %s.%s"; + private static final String INFORMATION_SCHEMA_SELECT_SQL_PSTMT = + "SELECT column_name, data_type FROM information_schema.columns WHERE table_schema = ? AND table_name = ?"; + private static final String COPY_INSERT_STATEMENT_FORMAT = "INSERT INTO %s.%s SELECT * FROM %s.%s"; + private static final String DELETE_STATEMENT_FORMAT = "DELETE FROM %s.%s"; + + private final JdbcBufferedInserter jdbcBufferedWriter; + private final Connection conn; + + public PostgresWriterCommands(State state, Connection conn) { + this.conn = conn; + this.jdbcBufferedWriter = new PostgresBufferedInserter(state, conn); + } + + @Override + public void setConnectionParameters(Properties properties, Connection conn) + throws SQLException { + // Postgres writer always uses one single transaction + this.conn.setAutoCommit(false); + } + + @Override + public void insert(String databaseName, String table, JdbcEntryData jdbcEntryData) + throws SQLException { + this.jdbcBufferedWriter.insert(databaseName, table, jdbcEntryData); + } + + @Override + public void flush() + throws SQLException { + this.jdbcBufferedWriter.flush(); + } + + @Override + public void createTableStructure(String databaseName, String fromStructure, String targetTableName) + throws SQLException { + String sql = String.format(CREATE_TABLE_SQL_FORMAT, databaseName, targetTableName, databaseName, fromStructure); + execute(sql); + } + + @Override + public boolean isEmpty(String database, String table) + throws SQLException { + String sql = String.format(SELECT_SQL_FORMAT, database, table); + try (PreparedStatement pstmt = this.conn.prepareStatement(sql); ResultSet resultSet = pstmt.executeQuery();) { + if (!resultSet.first()) { + throw new RuntimeException("Should have received at least one row from SQL " + pstmt); + } + return 0 == resultSet.getInt(1); + } + } + + @Override + public void truncate(String database, String table) + throws SQLException { + String sql = String.format(TRUNCATE_TABLE_FORMAT, database, table); + execute(sql); + } + + @Override + public void deleteAll(String database, String table) + throws SQLException { + String deleteSql = String.format(DELETE_STATEMENT_FORMAT, database, table); + execute(deleteSql); + } + + @Override + public void drop(String database, String table) + throws SQLException { + log.info("Dropping table " + table); + String sql = String.format(DROP_TABLE_SQL_FORMAT, database, table); + execute(sql); + } + + /** + * https://documentation.progress.com/output/DataDirect/DataDirectCloud/index.html#page/queries/postgresql-data-types.html + * {@inheritDoc} + * @see org.apache.gobblin.writer.commands.JdbcWriterCommands#retrieveDateColumns(java.sql.Connection, java.lang.String) + */ + @Override + public Map<String, JdbcType> retrieveDateColumns(String database, String table) + throws SQLException { + Map<String, JdbcType> targetDataTypes = + ImmutableMap.<String, JdbcType>builder().put("DATE", JdbcType.DATE).put("TIME WITH TIME ZONE", JdbcType.TIME) + .put("TIME WITHOUT TIME ZONE", JdbcType.TIME).put("TIMESTAMP WITH TIME ZONE", JdbcType.TIMESTAMP) + .put("TIMESTAMP WITHOUT TIME ZONE", JdbcType.TIMESTAMP).build(); + + ImmutableMap.Builder<String, JdbcType> dateColumnsBuilder = ImmutableMap.builder(); + + try (PreparedStatement pstmt = this.conn + .prepareStatement(INFORMATION_SCHEMA_SELECT_SQL_PSTMT, ResultSet.TYPE_SCROLL_INSENSITIVE, + ResultSet.CONCUR_READ_ONLY)) { + pstmt.setString(1, database); + pstmt.setString(2, table); + log.info("Retrieving column type information from SQL: " + pstmt); + try (ResultSet rs = pstmt.executeQuery()) { + if (!rs.first()) { + throw new IllegalArgumentException("No result from information_schema.columns"); + } + do { + String type = rs.getString("data_type").toUpperCase(); + JdbcType convertedType = targetDataTypes.get(type); + if (convertedType != null) { + dateColumnsBuilder.put(rs.getString("column_name"), convertedType); + } + } while (rs.next()); + } + } + return dateColumnsBuilder.build(); + } + + @Override + public void copyTable(String databaseName, String from, String to) + throws SQLException { + String sql = String.format(COPY_INSERT_STATEMENT_FORMAT, databaseName, to, databaseName, from); + execute(sql); + } + + private void execute(String sql) + throws SQLException { + log.info("Executing SQL " + sql); + try (PreparedStatement pstmt = this.conn.prepareStatement(sql)) { + pstmt.execute(); + } + } + + @Override + public String toString() { + return String.format("PostgresWriterCommands [bufferedWriter=%s]", this.jdbcBufferedWriter); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/a6dfdc6d/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java new file mode 100644 index 0000000..ac46d4d --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/writer/PostgresWriterCommandsTest.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.writer; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.converter.jdbc.JdbcType; +import org.apache.gobblin.writer.commands.PostgresWriterCommands; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.sun.rowset.JdbcRowSetImpl; + +import static org.mockito.Mockito.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + + +@Test(groups = {"gobblin.writer"}) +public class PostgresWriterCommandsTest { + @Test + public void testPostgresDateTypeRetrieval() + throws SQLException { + Connection conn = mock(Connection.class); + + PreparedStatement pstmt = mock(PreparedStatement.class); + when(conn.prepareStatement(any(String.class), any(Integer.class), any(Integer.class))).thenReturn(pstmt); + + ResultSet rs = createMockResultSet(); + when(pstmt.executeQuery()).thenReturn(rs); + + PostgresWriterCommands writerCommands = new PostgresWriterCommands(new State(), conn); + Map<String, JdbcType> actual = writerCommands.retrieveDateColumns("db", "users"); + + ImmutableMap.Builder<String, JdbcType> builder = ImmutableMap.builder(); + builder.put("date_of_birth", JdbcType.DATE); + builder.put("last_modified", JdbcType.TIME); + builder.put("created", JdbcType.TIMESTAMP); + + Map<String, JdbcType> expected = builder.build(); + + Assert.assertEquals(expected, actual); + } + + private ResultSet createMockResultSet() { + final List<Map<String, String>> expected = new ArrayList<>(); + Map<String, String> entry = new HashMap<>(); + entry.put("column_name", "name"); + entry.put("data_type", "varchar"); + expected.add(entry); + + entry = new HashMap<>(); + entry.put("column_name", "favorite_number"); + entry.put("data_type", "varchar"); + expected.add(entry); + + entry = new HashMap<>(); + entry.put("column_name", "favorite_color"); + entry.put("data_type", "varchar"); + expected.add(entry); + + entry = new HashMap<>(); + entry.put("column_name", "date_of_birth"); + entry.put("data_type", "date"); + expected.add(entry); + + entry = new HashMap<>(); + entry.put("column_name", "last_modified"); + entry.put("data_type", "time without time zone"); + expected.add(entry); + + entry = new HashMap<>(); + entry.put("column_name", "created"); + entry.put("data_type", "timestamp with time zone"); + expected.add(entry); + + return new JdbcRowSetImpl() { + private Iterator<Map<String, String>> it = expected.iterator(); + private Map<String, String> curr = null; + + @Override + public boolean first() { + it = expected.iterator(); + return next(); + } + + @Override + public boolean next() { + if (it.hasNext()) { + curr = it.next(); + return true; + } + return false; + } + + @Override + public String getString(String columnLabel) + throws SQLException { + if (curr == null) { + throw new SQLException("NPE on current cursor."); + } + String val = curr.get(columnLabel); + if (val == null) { + throw new SQLException(columnLabel + " does not exist."); + } + return val; + } + }; + } +}
