PHOENIX-3538 Regex bulk loader Add bulk loader which parses input based on a regular expression.
Contributed by [email protected] Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/d18da38a Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/d18da38a Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/d18da38a Branch: refs/heads/omid Commit: d18da38afa0d7bbc0221f6472bc3b037edc6e3d4 Parents: b5cf5aa Author: Gabriel Reid <[email protected]> Authored: Sun Feb 19 20:28:14 2017 +0100 Committer: Gabriel Reid <[email protected]> Committed: Mon Feb 20 08:17:57 2017 +0100 ---------------------------------------------------------------------- .../phoenix/end2end/RegexBulkLoadToolIT.java | 371 +++++++++++++++++++ .../phoenix/mapreduce/RegexBulkLoadTool.java | 74 ++++ .../mapreduce/RegexToKeyValueMapper.java | 135 +++++++ .../phoenix/util/regex/RegexUpsertExecutor.java | 80 ++++ 4 files changed, 660 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java new file mode 100644 index 0000000..47b0db7 --- /dev/null +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/RegexBulkLoadToolIT.java @@ -0,0 +1,371 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.end2end; + +import static org.apache.phoenix.query.QueryServices.DATE_FORMAT_ATTRIB; +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import java.io.PrintWriter; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.Statement; + +import org.apache.hadoop.fs.FSDataOutputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapred.FileAlreadyExistsException; +import org.apache.phoenix.mapreduce.RegexBulkLoadTool; +import org.apache.phoenix.util.DateUtil; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.ReadOnlyProps; +import org.apache.phoenix.util.TestUtil; +import org.junit.BeforeClass; +import org.junit.Test; + +public class RegexBulkLoadToolIT extends BaseOwnClusterIT { + + private static Connection conn; + private static String zkQuorum; + + @BeforeClass + public static void doSetup() throws Exception { + setUpTestDriver(ReadOnlyProps.EMPTY_PROPS); + zkQuorum = TestUtil.LOCALHOST + PhoenixRuntime.JDBC_PROTOCOL_SEPARATOR + getUtility().getZkCluster().getClientPort(); + conn = DriverManager.getConnection(getUrl()); + } + + @Test + public void testBasicImport() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE S.TABLE1 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)"); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,Name 1,1970/01/01"); + printWriter.println("2,Name 2,1970/01/02"); + printWriter.close(); + + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + regexBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + int exitCode = regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input1.csv", + "--table", "table1", + "--schema", "s", + "--regex", "([^,]*),([^,]*),([^,]*)", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM s.table1 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + + @Test + public void testFullOptionImport() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE2 (ID INTEGER NOT NULL PRIMARY KEY, " + + "NAME VARCHAR, NAMES VARCHAR ARRAY, FLAG BOOLEAN)"); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input2.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1|Name 1a;Name 1b,true"); + printWriter.println("2|Name 2a;Name 2b,false"); + printWriter.close(); + + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + int exitCode = regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input2.csv", + "--table", "table2", + "--zookeeper", zkQuorum, + "--array-delimiter", ";", + "--regex", "([^|]*)\\|([^,]*),([^,]*)", + "--import-columns", "ID,NAMES,FLAG"}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, names FROM table2 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertArrayEquals(new Object[] { "Name 1a", "Name 1b" }, (Object[]) rs.getArray(2).getArray()); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertArrayEquals(new Object[] { "Name 2a", "Name 2b" }, (Object[]) rs.getArray(2).getArray()); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + + @Test + public void testMultipleInputFiles() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE7 (ID INTEGER NOT NULL PRIMARY KEY, NAME VARCHAR, T DATE) SPLIT ON (1,2)"); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input1.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,Name 1,1970/01/01"); + printWriter.close(); + outputStream = fs.create(new Path("/tmp/input2.csv")); + printWriter = new PrintWriter(outputStream); + printWriter.println("2,Name 2,1970/01/02"); + printWriter.close(); + + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + regexBulkLoadTool.getConf().set(DATE_FORMAT_ATTRIB,"yyyy/MM/dd"); + int exitCode = regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input1.csv,/tmp/input2.csv", + "--table", "table7", + "--regex", "([^,]*),([^,]*),([^,]*)", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, name, t FROM table7 ORDER BY id"); + assertTrue(rs.next()); + assertEquals(1, rs.getInt(1)); + assertEquals("Name 1", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-01"), rs.getDate(3)); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("Name 2", rs.getString(2)); + assertEquals(DateUtil.parseDate("1970-01-02"), rs.getDate(3)); + assertFalse(rs.next()); + + rs.close(); + stmt.close(); + } + + @Test + public void testImportWithIndex() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE3 (ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = "CREATE INDEX TABLE3_IDX ON TABLE3 " + + " (FIRST_NAME ASC)" + + " INCLUDE (LAST_NAME)"; + stmt.execute(ddl); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + int exitCode = regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input3.csv", + "--table", "table3", + "--regex", "([^,]*),([^,]*),([^,]*)", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE3 where first_name='FirstName 2'"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("FirstName 2", rs.getString(2)); + + rs.close(); + stmt.close(); + } + + @Test + public void testImportWithLocalIndex() throws Exception { + + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE6 (ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR) SPLIt ON (1,2)"); + String ddl = "CREATE LOCAL INDEX TABLE6_IDX ON TABLE6 " + + " (FIRST_NAME ASC)"; + stmt.execute(ddl); + ddl = "CREATE LOCAL INDEX TABLE6_IDX2 ON TABLE6 " + " (LAST_NAME ASC)"; + stmt.execute(ddl); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input3.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1:LastName 1"); + printWriter.println("2,FirstName 2:LastName 2"); + printWriter.close(); + + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + int exitCode = regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input3.csv", + "--table", "table6", + "--regex", "([^,]*),([^:]*):([^,]*)", + "--zookeeper", zkQuorum}); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT id, FIRST_NAME FROM TABLE6 where first_name='FirstName 2'"); + assertTrue(rs.next()); + assertEquals(2, rs.getInt(1)); + assertEquals("FirstName 2", rs.getString(2)); + + rs.close(); + stmt.close(); + } + + @Test + public void testImportOneIndexTable() throws Exception { + testImportOneIndexTable("TABLE4", false); + } + + @Test + public void testImportOneLocalIndexTable() throws Exception { + testImportOneIndexTable("TABLE5", true); + } + + public void testImportOneIndexTable(String tableName, boolean localIndex) throws Exception { + + String indexTableName = String.format("%s_IDX", tableName); + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + String ddl = + "CREATE " + (localIndex ? "LOCAL" : "") + " INDEX " + indexTableName + " ON " + + tableName + "(FIRST_NAME ASC)"; + stmt.execute(ddl); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input4.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + int exitCode = regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input4.csv", + "--table", tableName, + "--regex", "([^,]*),([^,]*),([^,]*)", + "--index-table", indexTableName, + "--zookeeper", zkQuorum }); + assertEquals(0, exitCode); + + ResultSet rs = stmt.executeQuery("SELECT * FROM " + tableName); + assertFalse(rs.next()); + rs = stmt.executeQuery("SELECT FIRST_NAME FROM " + tableName + " where FIRST_NAME='FirstName 1'"); + assertTrue(rs.next()); + assertEquals("FirstName 1", rs.getString(1)); + + rs.close(); + stmt.close(); + } + + @Test + public void testInvalidArguments() { + String tableName = "TABLE8"; + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + try { + regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input4.csv", + "--table", tableName, + "--regex", "([^,]*),([^,]*),([^,]*)", + "--zookeeper", zkQuorum }); + fail(String.format("Table %s not created, hence should fail",tableName)); + } catch (Exception ex) { + assertTrue(ex instanceof IllegalArgumentException); + assertTrue(ex.getMessage().contains(String.format("Table %s not found", tableName))); + } + } + + @Test + public void testAlreadyExistsOutputPath() { + String tableName = "TABLE9"; + String outputPath = "/tmp/output/tabl9"; + try { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE " + tableName + "(ID INTEGER NOT NULL PRIMARY KEY, " + + "FIRST_NAME VARCHAR, LAST_NAME VARCHAR)"); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + fs.create(new Path(outputPath)); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input9.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1,FirstName 1,LastName 1"); + printWriter.println("2,FirstName 2,LastName 2"); + printWriter.close(); + + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input9.csv", + "--output", outputPath, + "--table", tableName, + "--regex", "([^,]*),([^,]*),([^,]*)", + "--zookeeper", zkQuorum }); + + fail(String.format("Output path %s already exists. hence, should fail",outputPath)); + } catch (Exception ex) { + assertTrue(ex instanceof FileAlreadyExistsException); + } + } + + @Test + public void testInvalidRegex() throws Exception { + Statement stmt = conn.createStatement(); + stmt.execute("CREATE TABLE TABLE10 (ID INTEGER NOT NULL PRIMARY KEY, " + + "NAME VARCHAR, NAMES VARCHAR ARRAY, FLAG BOOLEAN)"); + + FileSystem fs = FileSystem.get(getUtility().getConfiguration()); + FSDataOutputStream outputStream = fs.create(new Path("/tmp/input10.csv")); + PrintWriter printWriter = new PrintWriter(outputStream); + printWriter.println("1|Name 1a;Name 1b,true"); + printWriter.println("2|Name 2a;Name 2b"); + printWriter.close(); + + RegexBulkLoadTool regexBulkLoadTool = new RegexBulkLoadTool(); + regexBulkLoadTool.setConf(getUtility().getConfiguration()); + int exitCode = regexBulkLoadTool.run(new String[] { + "--input", "/tmp/input10.csv", + "--table", "table10", + "--zookeeper", zkQuorum, + "--array-delimiter", ";", + "--regex", "([^|]*)\\|([^,]*),([^,]*)", + "--import-columns", "ID,NAMES,FLAG"}); + assertEquals(-1, exitCode); + stmt.close(); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java new file mode 100644 index 0000000..94544c9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexBulkLoadTool.java @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.cli.CommandLine; +import org.apache.commons.cli.Option; +import org.apache.commons.cli.Options; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.util.ToolRunner; +import org.apache.phoenix.util.ColumnInfo; + +/** + * A tool for running MapReduce-based ingests of input data based on regex. + * Lists are converted into typed ARRAYS. + */ +public class RegexBulkLoadTool extends AbstractBulkLoadTool { + + static final Option REGEX_OPT = new Option("r", "regex", true, "Input regex String, defaults is (.*)"); + static final Option ARRAY_DELIMITER_OPT = new Option("a", "array-delimiter", true, "Array element delimiter (optional), defaults is ','"); + + @Override + protected Options getOptions() { + Options options = super.getOptions(); + options.addOption(REGEX_OPT); + options.addOption(ARRAY_DELIMITER_OPT); + return options; + } + + @Override + protected void configureOptions(CommandLine cmdLine, List<ColumnInfo> importColumns, + Configuration conf) throws SQLException { + if (cmdLine.hasOption(REGEX_OPT.getOpt())) { + String regexString = cmdLine.getOptionValue(REGEX_OPT.getOpt()); + conf.set(RegexToKeyValueMapper.REGEX_CONFKEY, regexString); + } + + if (cmdLine.hasOption(ARRAY_DELIMITER_OPT.getOpt())) { + String arraySeparator = cmdLine.getOptionValue(ARRAY_DELIMITER_OPT.getOpt()); + conf.set(RegexToKeyValueMapper.ARRAY_DELIMITER_CONFKEY, arraySeparator); + } + } + + @Override + protected void setupJob(Job job) { + // Allow overriding the job jar setting by using a -D system property at startup + if (job.getJar() == null) { + job.setJarByClass(RegexToKeyValueMapper.class); + } + job.setMapperClass(RegexToKeyValueMapper.class); + } + + public static void main(String[] args) throws Exception { + ToolRunner.run(new RegexBulkLoadTool(), args); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java new file mode 100644 index 0000000..f63923d --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/RegexToKeyValueMapper.java @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.schema.types.PDataType; +import org.apache.phoenix.schema.types.PTimestamp; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.UpsertExecutor; +import org.apache.phoenix.util.regex.RegexUpsertExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +/** + * MapReduce mapper that converts input lines into KeyValues based on the Regex that can be written to HFiles. + * <p/> + * KeyValues are produced by executing UPSERT statements on a Phoenix connection and then + * extracting the created KeyValues and rolling back the statement execution before it is + * committed to HBase. + */ +public class RegexToKeyValueMapper extends FormatToBytesWritableMapper<Map<?, ?>> { + + protected static final Logger LOG = LoggerFactory.getLogger(RegexToKeyValueMapper.class); + + /** Configuration key for the regex */ + public static final String REGEX_CONFKEY = "phoenix.mapreduce.import.regex"; + + /** Configuration key for the array element delimiter for input arrays */ + public static final String ARRAY_DELIMITER_CONFKEY = "phoenix.mapreduce.import.arraydelimiter"; + + /** Configuration key for default array delimiter */ + public static final String ARRAY_DELIMITER_DEFAULT = ","; + + private LineParser<Map<?, ?>> lineParser; + + @Override + protected LineParser<Map<?, ?>> getLineParser() { + return lineParser; + } + + @Override + protected void setup(Context context) throws IOException, InterruptedException { + super.setup(context); + } + + @VisibleForTesting + @Override + protected UpsertExecutor<Map<?, ?>, ?> buildUpsertExecutor(Configuration conf) { + String tableName = conf.get(TABLE_NAME_CONFKEY); + Preconditions.checkNotNull(tableName, "table name is not configured"); + + String regex = conf.get(REGEX_CONFKEY); + Preconditions.checkNotNull(regex, "regex is not configured"); + + List<ColumnInfo> columnInfoList = buildColumnInfoList(conf); + + String arraySeparator = conf.get(ARRAY_DELIMITER_CONFKEY, ARRAY_DELIMITER_DEFAULT); + + lineParser = new RegexLineParser(regex, columnInfoList, arraySeparator); + + return new RegexUpsertExecutor(conn, tableName, columnInfoList, upsertListener); + } + + /** + * Parses a single input line with regex, returning a {@link Map} objects. + */ + @VisibleForTesting + static class RegexLineParser implements LineParser<Map<?, ?>> { + private Pattern inputPattern; + private List<ColumnInfo> columnInfoList; + private String arraySeparator; + + public RegexLineParser(String regex, List<ColumnInfo> columnInfo, String arraySep) { + inputPattern = Pattern.compile(regex); + columnInfoList = columnInfo; + arraySeparator = arraySep; + } + + /** + * based on the regex and input, providing mapping between schema and input + */ + @Override + public Map<?, ?> parse(String input) throws IOException { + Map<String, Object> data = new HashMap<>(); + Matcher m = inputPattern.matcher(input); + if (m.groupCount() != columnInfoList.size()) { + LOG.debug(String.format("based on the regex and input, input fileds %s size doesn't match the table columns %s size", m.groupCount(), columnInfoList.size())); + return data; + } + + if (m.find( )) { + for (int i = 0; i < columnInfoList.size(); i++) { + ColumnInfo columnInfo = columnInfoList.get(i); + String colName = columnInfo.getColumnName(); + String value = m.group(i + 1); + PDataType pDataType = PDataType.fromTypeId(columnInfo.getSqlType()); + if (pDataType.isArrayType()) { + data.put(colName, Arrays.asList(value.split(arraySeparator))); + } else if (pDataType.isCoercibleTo(PTimestamp.INSTANCE)) { + data.put(colName, value); + } else { + data.put(colName, pDataType.toObject(value)); + } + } + } + return data; + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/phoenix/blob/d18da38a/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java b/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java new file mode 100644 index 0000000..0388d9c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/regex/RegexUpsertExecutor.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.util.regex; + +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.util.List; +import java.util.Map; + +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.UpsertExecutor; +import org.apache.phoenix.util.json.JsonUpsertExecutor; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +/** {@link UpsertExecutor} over {@link Map} objects, convert input record into {@link Map} objects by using regex. */ +public class RegexUpsertExecutor extends JsonUpsertExecutor { + + protected static final Logger LOG = LoggerFactory.getLogger(RegexUpsertExecutor.class); + + /** Testing constructor. Do not use in prod. */ + @VisibleForTesting + protected RegexUpsertExecutor(Connection conn, List<ColumnInfo> columnInfoList, + PreparedStatement stmt, UpsertListener<Map<?, ?>> upsertListener) { + super(conn, columnInfoList, stmt, upsertListener); + } + + public RegexUpsertExecutor(Connection conn, String tableName, List<ColumnInfo> columnInfoList, + UpsertExecutor.UpsertListener<Map<?, ?>> upsertListener) { + super(conn, tableName, columnInfoList, upsertListener); + } + + @Override + protected void execute(Map<?, ?> record) { + int fieldIndex = 0; + String colName = null; + try { + if (record.size() < conversionFunctions.size()) { + String message = String.format("Input record does not have enough values based on regex (has %d, but needs %d)", + record.size(), conversionFunctions.size()); + throw new IllegalArgumentException(message); + } + for (fieldIndex = 0; fieldIndex < conversionFunctions.size(); fieldIndex++) { + colName = columnInfos.get(fieldIndex).getColumnName(); + Object sqlValue = conversionFunctions.get(fieldIndex).apply(record.get(colName)); + if (sqlValue != null) { + preparedStatement.setObject(fieldIndex + 1, sqlValue); + } else { + preparedStatement.setNull(fieldIndex + 1, dataTypes.get(fieldIndex).getSqlType()); + } + } + preparedStatement.execute(); + upsertListener.upsertDone(++upsertCount); + } catch (Exception e) { + if (LOG.isDebugEnabled()) { + // Even though this is an error we only log it with debug logging because we're notifying the + // listener, and it can do its own logging if needed + LOG.debug("Error on record " + record + ", fieldIndex " + fieldIndex + ", colName " + colName, e); + } + upsertListener.errorOnRecord(record, new Exception("fieldIndex: " + fieldIndex + ", colName " + colName, e)); + } + } +} \ No newline at end of file
