Repository: incubator-gobblin Updated Branches: refs/heads/master 8a374f207 -> 05bf034e3
[GOBBLIN-203] Add Postgresql source and extractor Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e12fa76d Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e12fa76d Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e12fa76d Branch: refs/heads/master Commit: e12fa76dc6e4948c1d8ecdc8bf6d4ceabdbfc222 Parents: ce60d2c Author: tilakpatidar <[email protected]> Authored: Thu Aug 10 11:53:58 2017 +0530 Committer: tilakpatidar <[email protected]> Committed: Fri Aug 11 10:08:51 2017 +0530 ---------------------------------------------------------------------- .../extract/jdbc/PostgresqlSource.java | 38 +++ .../source/jdbc/PostgresqlExtractor.java | 265 +++++++++++++++++++ .../source/jdbc/PostgresqlExtractorTest.java | 147 ++++++++++ 3 files changed, 450 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e12fa76d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/PostgresqlSource.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/PostgresqlSource.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/PostgresqlSource.java new file mode 100644 index 0000000..e263a7e --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/extractor/extract/jdbc/PostgresqlSource.java @@ -0,0 +1,38 @@ +package org.apache.gobblin.source.extractor.extract.jdbc; + +import java.io.IOException; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.source.extractor.Extractor; +import org.apache.gobblin.source.extractor.exception.ExtractPrepareException; +import org.apache.gobblin.source.extractor.extract.QueryBasedSource; +import org.apache.gobblin.source.jdbc.PostgresqlExtractor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; + + +/** + * An implementation of postgresql source to get work units + * + * @author tilakpatidar + */ + +public class PostgresqlSource extends QueryBasedSource<JsonArray, JsonElement> { + private static final Logger LOG = LoggerFactory.getLogger(PostgresqlSource.class); + + @Override + public Extractor<JsonArray, JsonElement> getExtractor(WorkUnitState state) + throws IOException { + Extractor<JsonArray, JsonElement> extractor; + try { + extractor = new PostgresqlExtractor(state).build(); + } catch (ExtractPrepareException e) { + LOG.error("Failed to prepare extractor: error - " + e.getMessage()); + throw new IOException(e); + } + return extractor; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e12fa76d/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/jdbc/PostgresqlExtractor.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/jdbc/PostgresqlExtractor.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/jdbc/PostgresqlExtractor.java new file mode 100644 index 0000000..3fcd411 --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/source/jdbc/PostgresqlExtractor.java @@ -0,0 +1,265 @@ +package org.apache.gobblin.source.jdbc; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.StringUtils; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.source.extractor.DataRecordException; +import org.apache.gobblin.source.extractor.exception.HighWatermarkException; +import org.apache.gobblin.source.extractor.exception.RecordCountException; +import org.apache.gobblin.source.extractor.exception.SchemaException; +import org.apache.gobblin.source.extractor.extract.Command; +import org.apache.gobblin.source.extractor.utils.Utils; +import org.apache.gobblin.source.extractor.watermark.Predicate; +import org.apache.gobblin.source.extractor.watermark.WatermarkType; +import org.apache.gobblin.source.workunit.WorkUnit; + +import com.google.common.collect.ImmutableMap; +import com.google.gson.JsonElement; + +import lombok.extern.slf4j.Slf4j; + + +@Slf4j +public class PostgresqlExtractor extends JdbcExtractor { + private static final String POSTGRES_TIMESTAMP_FORMAT = "yyyy-MM-dd HH:mm:ss"; + private static final String POSTGRES_DATE_FORMAT = "yyyy-MM-dd"; + private static final String POSTGRES_HOUR_FORMAT = "HH"; + private static final long SAMPLERECORDCOUNT = -1; + + public PostgresqlExtractor(WorkUnitState workUnitState) { + super(workUnitState); + } + + @Override + public String getHourPredicateCondition(String column, long value, String valueFormat, String operator) { + log.debug("Getting hour predicate for Postgres"); + String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, POSTGRES_HOUR_FORMAT); + return Utils.getCoalesceColumnNames(column) + " " + operator + " '" + formattedvalue + "'"; + } + + @Override + public String getDatePredicateCondition(String column, long value, String valueFormat, String operator) { + log.debug("Getting date predicate for Postgres"); + String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, POSTGRES_DATE_FORMAT); + return Utils.getCoalesceColumnNames(column) + " " + operator + " '" + formattedvalue + "'"; + } + + @Override + public String getTimestampPredicateCondition(String column, long value, String valueFormat, String operator) { + log.debug("Getting timestamp predicate for Postgres"); + String formattedvalue = Utils.toDateTimeFormat(Long.toString(value), valueFormat, POSTGRES_TIMESTAMP_FORMAT); + return Utils.getCoalesceColumnNames(column) + " " + operator + " '" + formattedvalue + "'"; + } + + @Override + public List<Command> getSchemaMetadata(String schema, String entity) + throws SchemaException { + log.debug("Build query to get schema"); + List<Command> commands = new ArrayList<>(); + List<String> queryParams = Arrays.asList(entity, schema); + + String metadataSql = "select col.column_name, col.data_type, " + + "case when CHARACTER_OCTET_LENGTH is null then 0 else 0 end as length, " + + "case when NUMERIC_PRECISION is null then 0 else NUMERIC_PRECISION end as precesion, " + + "case when NUMERIC_SCALE is null then 0 else NUMERIC_SCALE end as scale, " + + "case when is_nullable='NO' then 'false' else 'true' end as nullable, '' as format, " + "'' as comment " + + "from information_schema.COLUMNS col " + + "WHERE upper(col.table_name)=upper(?) AND upper(col.table_schema)=upper(?) " + + "order by col.ORDINAL_POSITION"; + + commands.add(getCommand(metadataSql, JdbcCommand.JdbcCommandType.QUERY)); + commands.add(getCommand(queryParams, JdbcCommand.JdbcCommandType.QUERYPARAMS)); + return commands; + } + + @Override + public List<Command> getHighWatermarkMetadata(String schema, String entity, String watermarkColumn, + List<Predicate> predicateList) + throws HighWatermarkException { + log.debug("Build query to get high watermark"); + List<Command> commands = new ArrayList<>(); + + String columnProjection = "max(" + Utils.getCoalesceColumnNames(watermarkColumn) + ")"; + String watermarkFilter = this.concatPredicates(predicateList); + String query = this.getExtractSql(); + + if (StringUtils.isBlank(watermarkFilter)) { + watermarkFilter = "1=1"; + } + query = query.replace(this.getOutputColumnProjection(), columnProjection) + .replace(ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_WATERMARK_PREDICATE_SYMBOL, watermarkFilter); + + commands.add(getCommand(query, JdbcCommand.JdbcCommandType.QUERY)); + return commands; + } + + @Override + public List<Command> getCountMetadata(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) + throws RecordCountException { + log.debug("Build query to get source record count"); + List<Command> commands = new ArrayList<>(); + + String columnProjection = "COUNT(1)"; + String watermarkFilter = this.concatPredicates(predicateList); + String query = this.getExtractSql(); + + if (StringUtils.isBlank(watermarkFilter)) { + watermarkFilter = "1=1"; + } + + query = query.replace(this.getOutputColumnProjection(), columnProjection) + .replace(ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_WATERMARK_PREDICATE_SYMBOL, watermarkFilter); + String sampleFilter = this.constructSampleClause(); + query = query + sampleFilter; + + if (!StringUtils.isEmpty(sampleFilter)) { + query = "SELECT COUNT(1) FROM (" + query.replace(" COUNT(1) ", " 1 ") + ")temp"; + } + commands.add(getCommand(query, JdbcCommand.JdbcCommandType.QUERY)); + return commands; + } + + @Override + public List<Command> getDataMetadata(String schema, String entity, WorkUnit workUnit, List<Predicate> predicateList) + throws DataRecordException { + log.debug("Build query to extract data"); + List<Command> commands = new ArrayList<>(); + int fetchsize = this.workUnitState.getPropAsInt(ConfigurationKeys.SOURCE_QUERYBASED_JDBC_RESULTSET_FETCH_SIZE, + ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_JDBC_RESULTSET_FETCH_SIZE); + String watermarkFilter = this.concatPredicates(predicateList); + String query = this.getExtractSql(); + if (StringUtils.isBlank(watermarkFilter)) { + watermarkFilter = "1=1"; + } + + query = query.replace(ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_WATERMARK_PREDICATE_SYMBOL, watermarkFilter); + String sampleFilter = this.constructSampleClause(); + query = query + sampleFilter; + + commands.add(getCommand(query, JdbcCommand.JdbcCommandType.QUERY)); + commands.add(getCommand(fetchsize, JdbcCommand.JdbcCommandType.FETCHSIZE)); + return commands; + } + + @Override + public String getConnectionUrl() { + String host = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_HOST_NAME); + String port = this.workUnitState.getProp(ConfigurationKeys.SOURCE_CONN_PORT); + String database = this.workUnitState.getProp(ConfigurationKeys.SOURCE_QUERYBASED_SCHEMA); + return "jdbc:postgresql://" + host.trim() + ":" + port + "/" + database.trim(); + } + + /** {@inheritdoc} */ + @Override + protected boolean convertBitToBoolean() { + return false; + } + + @Override + public Map<String, String> getDataTypeMap() { + Map<String, String> dataTypeMap = + ImmutableMap.<String, String>builder().put("tinyint", "int").put("smallint", "int").put("mediumint", "int") + .put("int", "int").put("bigint", "long").put("float", "float").put("double", "double") + .put("decimal", "double").put("numeric", "double").put("date", "date").put("timestamp", "timestamp") + .put("datetime", "timestamp").put("time", "time").put("char", "string").put("varchar", "string") + .put("varbinary", "string").put("text", "string").put("tinytext", "string").put("mediumtext", "string") + .put("longtext", "string").put("blob", "string").put("tinyblob", "string").put("mediumblob", "string") + .put("longblob", "string").put("enum", "string").build(); + return dataTypeMap; + } + + @Override + public String getWatermarkSourceFormat(WatermarkType watermarkType) { + String columnFormat = null; + switch (watermarkType) { + case TIMESTAMP: + columnFormat = "yyyy-MM-dd HH:mm:ss"; + break; + case DATE: + columnFormat = "yyyy-MM-dd"; + break; + default: + log.error("Watermark type " + watermarkType.toString() + " not recognized"); + } + return columnFormat; + } + + @Override + public long exractSampleRecordCountFromQuery(String query) { + if (StringUtils.isBlank(query)) { + return SAMPLERECORDCOUNT; + } + + long recordcount = SAMPLERECORDCOUNT; + + String limit = null; + String inputQuery = query.toLowerCase(); + int limitIndex = inputQuery.indexOf(" limit "); + if (limitIndex > 0) { + limit = query.substring(limitIndex + 7).trim(); + } + + if (StringUtils.isNotBlank(limit)) { + try { + recordcount = Long.parseLong(limit); + } catch (Exception e) { + log.error("Ignoring incorrct limit value in input query:" + limit); + } + } + return recordcount; + } + + @Override + public String removeSampleClauseFromQuery(String query) { + if (StringUtils.isBlank(query)) { + return null; + } + String limitString = ""; + String inputQuery = query.toLowerCase(); + int limitIndex = inputQuery.indexOf(" limit"); + if (limitIndex > 0) { + limitString = query.substring(limitIndex); + } + if (inputQuery.contains(" where ")) { + String newQuery = query.replace(limitString, " AND 1=1"); + if (newQuery.toLowerCase().contains(" where and 1=1")) { + return query.replace(limitString, " 1=1"); + } + return newQuery; + } + return query.replace(limitString, " where 1=1"); + } + + @Override + public String constructSampleClause() { + long sampleRowCount = this.getSampleRecordCount(); + if (sampleRowCount >= 0) { + return " limit " + sampleRowCount; + } + return ""; + } + + @Override + public String getLeftDelimitedIdentifier() { + return this.enableDelimitedIdentifier ? "`" : ""; + } + + @Override + public String getRightDelimitedIdentifier() { + return this.enableDelimitedIdentifier ? "`" : ""; + } + + @Override + public Iterator<JsonElement> getRecordSetFromSourceApi(String schema, String entity, WorkUnit workUnit, + List<Predicate> predicateList) + throws IOException { + return null; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e12fa76d/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/source/jdbc/PostgresqlExtractorTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/source/jdbc/PostgresqlExtractorTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/source/jdbc/PostgresqlExtractorTest.java new file mode 100644 index 0000000..4b3ffbd --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/source/jdbc/PostgresqlExtractorTest.java @@ -0,0 +1,147 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.gobblin.source.jdbc; + +import java.sql.ResultSet; +import java.sql.Types; +import java.util.List; + +import org.apache.commons.lang.StringUtils; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.source.extractor.extract.CommandOutput; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableList; +import com.mockrunner.mock.jdbc.MockResultSet; + +import static org.testng.Assert.assertEquals; + + +@Test(groups = {"gobblin.source.jdbc"}) +public class PostgresqlExtractorTest { + + private final static List<MockJdbcColumn> COLUMNS = ImmutableList + .of(new MockJdbcColumn("id", "1", Types.INTEGER), new MockJdbcColumn("name", "name_1", Types.VARCHAR), + new MockJdbcColumn("age", "20", Types.INTEGER)); + + private static final String QUERY_1 = "SELECT * FROM x WHERE LIMIT 532"; + private static final String QUERY_2 = "SELECT * FROM x WHERE x.a < 10 LIMIT 50"; + private static final String QUERY_3 = "SELECT * FROM x WHERE x.a < 10 AND x.b = 20 LIMIT 50"; + private static final String QUERY_EMPTY = ""; + private static final String QUERY_REG = "SELECT * FROM x WHERE x.a < 10"; + + private CommandOutput<JdbcCommand, ResultSet> output; + private State state; + private PostgresqlExtractor postgresqlExtractor; + + @BeforeClass + public void setup() { + output = new JdbcCommandOutput(); + try { + output.put(new JdbcCommand(), buildMockResultSet()); + } catch (Exception e) { + // hack for test failure + assertEquals("PostgresqlExtractorTest: error initializing mock result set", "false"); + } + state = new WorkUnitState(); + state.setId("id"); + postgresqlExtractor = new PostgresqlExtractor((WorkUnitState) state); + } + + @Test + public void testConstructSampleClause() + throws Exception { + String sClause = postgresqlExtractor.constructSampleClause(); + assertEquals(sClause.trim(), (" limit " + postgresqlExtractor.getSampleRecordCount()).trim()); + } + + @Test + public void testRemoveSampleClauseFromQuery() + throws Exception { + String q1Expected = "SELECT * FROM x WHERE 1=1"; + String q2Expected = "SELECT * FROM x WHERE x.a < 10 AND 1=1"; + String q3Expected = "SELECT * FROM x WHERE x.a < 10 AND x.b = 20 AND 1=1"; + + String q1Parsed = postgresqlExtractor.removeSampleClauseFromQuery(QUERY_1); + String q2Parsed = postgresqlExtractor.removeSampleClauseFromQuery(QUERY_2); + String q3Parsed = postgresqlExtractor.removeSampleClauseFromQuery(QUERY_3); + + assertEquals(q1Parsed, q1Expected); + assertEquals(q2Parsed, q2Expected); + assertEquals(q3Parsed, q3Expected); + } + + @Test + public void testExractSampleRecordCountFromQuery() + throws Exception { + long res1 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_1); + long res2 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_2); + long res3 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_3); + long res4 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_EMPTY); + long res5 = postgresqlExtractor.exractSampleRecordCountFromQuery(QUERY_REG); + + assertEquals(res1, (long) 532); + assertEquals(res2, (long) 50); + assertEquals(res3, (long) 50); + assertEquals(res4, (long) -1); + assertEquals(res5, (long) -1); + } + + @Test + public void testHourPredicateCondition() + throws Exception { + String res1 = postgresqlExtractor.getHourPredicateCondition("my_time", 24L, "h", ">"); + String res2 = postgresqlExtractor.getHourPredicateCondition("my_time", 23L, "HH", ">"); + String res3 = postgresqlExtractor.getHourPredicateCondition("my_time", 2L, "h", ">"); + + assertEquals(res1, "my_time > '00'"); + assertEquals(res2, "my_time > '23'"); + assertEquals(res3, "my_time > '02'"); + } + + @Test + public void testDatePredicateCondition() + throws Exception { + String res1 = postgresqlExtractor.getDatePredicateCondition("my_date", 12061992L, "ddMMyyyy", ">"); + + assertEquals(res1, "my_date > '1992-06-12'"); + } + + @Test + public void testTimePredicateCondition() + throws Exception { + String res1 = postgresqlExtractor.getTimestampPredicateCondition("my_date", 12061992080809L, "ddMMyyyyhhmmss", ">"); + + assertEquals(res1, "my_date > '1992-06-12 08:08:09'"); + } + + /** + * Build a mock implementation of Result using Mockito + */ + private ResultSet buildMockResultSet() + throws Exception { + + MockResultSet mrs = new MockResultSet(StringUtils.EMPTY); + for (MockJdbcColumn column : COLUMNS) { + mrs.addColumn(column.getColumnName(), ImmutableList.of(column.getValue())); + } + return mrs; + } +} \ No newline at end of file
