http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java index 500e403..eb2c124 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseStorage.java @@ -20,6 +20,8 @@ package org.apache.phoenix.pig; import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; +import java.sql.SQLException; +import java.util.List; import java.util.Properties; import org.apache.commons.cli.CommandLine; @@ -28,13 +30,17 @@ import org.apache.commons.cli.GnuParser; import org.apache.commons.cli.HelpFormatter; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.OutputFormat; import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.phoenix.pig.hadoop.PhoenixOutputFormat; -import org.apache.phoenix.pig.hadoop.PhoenixRecord; +import org.apache.phoenix.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.pig.writable.PhoenixPigDBWritable; +import org.apache.phoenix.util.ColumnInfo; import org.apache.pig.ResourceSchema; import org.apache.pig.ResourceSchema.ResourceFieldSchema; import org.apache.pig.StoreFuncInterface; @@ -75,74 +81,76 @@ import org.apache.pig.impl.util.UDFContext; @SuppressWarnings("rawtypes") public class PhoenixHBaseStorage implements StoreFuncInterface { - private PhoenixPigConfiguration config; - private RecordWriter<NullWritable, PhoenixRecord> writer; - private String contextSignature = null; - private ResourceSchema schema; - private long batchSize; - private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat(); - - // Set of options permitted - private final static Options validOptions = new Options(); - private final static CommandLineParser parser = new GnuParser(); - private final static String SCHEMA = "_schema"; - - private final CommandLine configuredOptions; - private final String server; - - public PhoenixHBaseStorage(String server) throws ParseException { - this(server, null); - } - - public PhoenixHBaseStorage(String server, String optString) - throws ParseException { - populateValidOptions(); - this.server = server; - - String[] optsArr = optString == null ? new String[0] : optString.split(" "); - try { - configuredOptions = parser.parse(validOptions, optsArr); - } catch (ParseException e) { - HelpFormatter formatter = new HelpFormatter(); - formatter.printHelp("[-batchSize]", validOptions); - throw e; - } - - batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize")); - } - - private static void populateValidOptions() { - validOptions.addOption("batchSize", true, "Specify upsert batch size"); - } - - /** - * Returns UDFProperties based on <code>contextSignature</code>. - */ - private Properties getUDFProperties() { - return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature }); - } - - - /** - * Parse the HBase table name and configure job - */ - @Override - public void setStoreLocation(String location, Job job) throws IOException { - URI locationURI; + private Configuration config; + private RecordWriter<NullWritable, PhoenixPigDBWritable> writer; + private List<ColumnInfo> columnInfo = null; + private String contextSignature = null; + private ResourceSchema schema; + private long batchSize; + private final PhoenixOutputFormat outputFormat = new PhoenixOutputFormat(); + + // Set of options permitted + private final static Options validOptions = new Options(); + private final static CommandLineParser parser = new GnuParser(); + private final static String SCHEMA = "_schema"; + + private final CommandLine configuredOptions; + private final String server; + + public PhoenixHBaseStorage(String server) throws ParseException { + this(server, null); + } + + public PhoenixHBaseStorage(String server, String optString) + throws ParseException { + populateValidOptions(); + this.server = server; + + String[] optsArr = optString == null ? new String[0] : optString.split(" "); + try { + configuredOptions = parser.parse(validOptions, optsArr); + } catch (ParseException e) { + HelpFormatter formatter = new HelpFormatter(); + formatter.printHelp("[-batchSize]", validOptions); + throw e; + } + batchSize = Long.parseLong(configuredOptions.getOptionValue("batchSize")); + } + + private static void populateValidOptions() { + validOptions.addOption("batchSize", true, "Specify upsert batch size"); + } + + /** + * Returns UDFProperties based on <code>contextSignature</code>. + */ + private Properties getUDFProperties() { + return UDFContext.getUDFContext().getUDFProperties(this.getClass(), new String[] { contextSignature }); + } + + + /** + * Parse the HBase table name and configure job + */ + @Override + public void setStoreLocation(String location, Job job) throws IOException { + URI locationURI; try { locationURI = new URI(location); if (!"hbase".equals(locationURI.getScheme())) { throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location)); } + config = job.getConfiguration(); + config.set(HConstants.ZOOKEEPER_QUORUM, server); String tableName = locationURI.getAuthority(); // strip off the leading path token '/' String columns = null; if(!locationURI.getPath().isEmpty()) { columns = locationURI.getPath().substring(1); + PhoenixConfigurationUtil.setUpsertColumnNames(config, columns); } - config = new PhoenixPigConfiguration(job.getConfiguration()); - config.configure(server, tableName, batchSize, columns); - + PhoenixConfigurationUtil.setOutputTableName(config,tableName); + PhoenixConfigurationUtil.setBatchSize(config,batchSize); String serializedSchema = getUDFProperties().getProperty(contextSignature + SCHEMA); if (serializedSchema != null) { schema = (ResourceSchema) ObjectSerializer.deserialize(serializedSchema); @@ -150,59 +158,61 @@ public class PhoenixHBaseStorage implements StoreFuncInterface { } catch (URISyntaxException e) { throw new IOException(String.format("Location must use the hbase protocol, hbase://tableName[/columnList]. Supplied location=%s",location),e); } - } + } - @SuppressWarnings("unchecked") + @SuppressWarnings("unchecked") @Override - public void prepareToWrite(RecordWriter writer) throws IOException { - this.writer =writer; - } + public void prepareToWrite(RecordWriter writer) throws IOException { + this.writer = writer; + try { + this.columnInfo = PhoenixConfigurationUtil.getUpsertColumnMetadataList(this.config); + } catch(SQLException sqle) { + throw new IOException(sqle); + } + } - @Override - public void putNext(Tuple t) throws IOException { + @Override + public void putNext(Tuple t) throws IOException { ResourceFieldSchema[] fieldSchemas = (schema == null) ? null : schema.getFields(); - - PhoenixRecord record = new PhoenixRecord(fieldSchemas); - + PhoenixPigDBWritable record = PhoenixPigDBWritable.newInstance(fieldSchemas,this.columnInfo); for(int i=0; i<t.size(); i++) { - record.add(t.get(i)); + record.add(t.get(i)); + } + try { + this.writer.write(null, record); + } catch (InterruptedException e) { + throw new RuntimeException(e); } - try { - writer.write(null, record); - } catch (InterruptedException e) { - throw new RuntimeException(e); - } - - } + } - @Override - public void setStoreFuncUDFContextSignature(String signature) { + @Override + public void setStoreFuncUDFContextSignature(String signature) { this.contextSignature = signature; - } - - @Override - public void cleanupOnFailure(String location, Job job) throws IOException { - } - - @Override - public void cleanupOnSuccess(String location, Job job) throws IOException { - } - - @Override - public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { - return location; - } - - @Override - public OutputFormat getOutputFormat() throws IOException { - return outputFormat; - } - - @Override - public void checkSchema(ResourceSchema s) throws IOException { - schema = s; - getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema)); - } + } + + @Override + public void cleanupOnFailure(String location, Job job) throws IOException { + } + + @Override + public void cleanupOnSuccess(String location, Job job) throws IOException { + } + + @Override + public String relToAbsPathForStoreLocation(String location, Path curDir) throws IOException { + return location; + } + + @Override + public OutputFormat getOutputFormat() throws IOException { + return outputFormat; + } + + @Override + public void checkSchema(ResourceSchema s) throws IOException { + schema = s; + getUDFProperties().setProperty(contextSignature + SCHEMA, ObjectSerializer.serialize(schema)); + } } \ No newline at end of file
http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java deleted file mode 100644 index c6b6ec9..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixPigConfiguration.java +++ /dev/null @@ -1,340 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.phoenix.pig; - -import static org.apache.commons.lang.StringUtils.isNotEmpty; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.sql.SQLException; -import java.util.List; -import java.util.Properties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.phoenix.jdbc.PhoenixConnection; -import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder; -import org.apache.phoenix.util.ColumnInfo; -import org.apache.phoenix.util.PhoenixRuntime; -import org.apache.phoenix.util.QueryUtil; - -import com.google.common.annotations.VisibleForTesting; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Lists; - - -/** - * A container for configuration to be used with {@link PhoenixHBaseStorage} and {@link PhoenixHBaseLoader} - * - */ -public class PhoenixPigConfiguration { - - private static final Log LOG = LogFactory.getLog(PhoenixPigConfiguration.class); - - private PhoenixPigConfigurationUtil util; - - /** - * Speculative execution of Map tasks - */ - public static final String MAP_SPECULATIVE_EXEC = "mapred.map.tasks.speculative.execution"; - - /** - * Speculative execution of Reduce tasks - */ - public static final String REDUCE_SPECULATIVE_EXEC = "mapred.reduce.tasks.speculative.execution"; - - public static final String SERVER_NAME = "phoenix.hbase.server.name"; - - public static final String TABLE_NAME = "phoenix.hbase.table.name"; - - public static final String UPSERT_COLUMNS = "phoenix.hbase.upsert.columns"; - - public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt"; - - public static final String UPSERT_COLUMN_INFO_KEY = "phoenix.upsert.columninfos.list"; - - public static final String SELECT_STATEMENT = "phoenix.select.stmt"; - - public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size"; - - //columns projected given as part of LOAD. - public static final String SELECT_COLUMNS = "phoneix.select.query.columns"; - - public static final String SELECT_COLUMN_INFO_KEY = "phoenix.select.columninfos.list"; - - public static final String SCHEMA_TYPE = "phoenix.select.schema.type"; - - // the delimiter supported during LOAD and STORE when projected columns are given. - public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter"; - - public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000; - - public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ","; - - private final Configuration conf; - - public PhoenixPigConfiguration(Configuration conf) { - this.conf = conf; - this.util = new PhoenixPigConfigurationUtil(); - } - - public void configure(String server, String tableName, long batchSize) { - configure(server,tableName,batchSize,null); - } - - public void configure(String server, String tableName, long batchSize, String columns) { - conf.set(SERVER_NAME, server); - conf.set(TABLE_NAME, tableName); - conf.setLong(UPSERT_BATCH_SIZE, batchSize); - if (isNotEmpty(columns)) { - conf.set(UPSERT_COLUMNS, columns); - } - conf.setBoolean(MAP_SPECULATIVE_EXEC, false); - conf.setBoolean(REDUCE_SPECULATIVE_EXEC, false); - } - - - /** - * Creates a {@link Connection} with autoCommit set to false. - * @throws SQLException - */ - public Connection getConnection() throws SQLException { - return getUtil().getConnection(getConfiguration()); - } - - public String getUpsertStatement() throws SQLException { - return getUtil().getUpsertStatement(getConfiguration(), getTableName()); - } - - public long getBatchSize() throws SQLException { - return getUtil().getBatchSize(getConfiguration()); - } - - public String getServer() { - return conf.get(SERVER_NAME); - } - - public List<ColumnInfo> getColumnMetadataList() throws SQLException { - return getUtil().getUpsertColumnMetadataList(getConfiguration(), getTableName()); - } - - public String getUpsertColumns() { - return conf.get(UPSERT_COLUMNS); - } - - public String getTableName() { - return conf.get(TABLE_NAME); - } - - public Configuration getConfiguration() { - return this.conf; - } - - public String getSelectStatement() throws SQLException { - return getUtil().getSelectStatement(getConfiguration(), getTableName()); - } - - public List<ColumnInfo> getSelectColumnMetadataList() throws SQLException { - return getUtil().getSelectColumnMetadataList(getConfiguration(), getTableName()); - } - - public int getSelectColumnsCount() throws SQLException { - return getUtil().getSelectColumnsCount(getConfiguration(), getTableName()); - } - - public SchemaType getSchemaType() { - final String schemaTp = conf.get(SCHEMA_TYPE); - return SchemaType.valueOf(schemaTp); - } - - - public void setServerName(final String zookeeperQuorum) { - this.conf.set(SERVER_NAME, zookeeperQuorum); - } - - public void setTableName(final String tableName) { - Preconditions.checkNotNull(tableName, "HBase Table name cannot be null!"); - this.conf.set(TABLE_NAME, tableName); - } - - public void setSelectStatement(final String selectStatement) { - this.conf.set(SELECT_STATEMENT, selectStatement); - } - - public void setSelectColumns(String selectColumns) { - this.conf.set(SELECT_COLUMNS, selectColumns); - } - - public PhoenixPigConfigurationUtil getUtil() { - return this.util; - } - - public void setSchemaType(final SchemaType schemaType) { - this.conf.set(SCHEMA_TYPE, schemaType.name()); - } - - public enum SchemaType { - TABLE, - QUERY; - } - - - @VisibleForTesting - static class PhoenixPigConfigurationUtil { - - public Connection getConnection(final Configuration configuration) throws SQLException { - Preconditions.checkNotNull(configuration); - Properties props = new Properties(); - final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(SERVER_NAME)), props).unwrap(PhoenixConnection.class); - conn.setAutoCommit(false); - return conn; - } - - public List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException { - Preconditions.checkNotNull(configuration); - Preconditions.checkNotNull(tableName); - final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY); - if(isNotEmpty(columnInfoStr)) { - return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr); - } - final Connection connection = getConnection(configuration); - String upsertColumns = configuration.get(UPSERT_COLUMNS); - List<String> upsertColumnList = null; - if(isNotEmpty(upsertColumns)) { - final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER); - upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns)); - LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s " - ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList) - )); - } - List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList); - final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList); - // we put the encoded column infos in the Configuration for re usability. - configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos); - closeConnection(connection); - return columnMetadataList; - } - - public String getUpsertStatement(final Configuration configuration,final String tableName) throws SQLException { - Preconditions.checkNotNull(configuration); - Preconditions.checkNotNull(tableName); - String upsertStmt = configuration.get(UPSERT_STATEMENT); - if(isNotEmpty(upsertStmt)) { - return upsertStmt; - } - final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,"")); - final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration, tableName); - if (useUpsertColumns) { - // Generating UPSERT statement without column name information. - upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList); - LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt); - } else { - // Generating UPSERT statement without column name information. - upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size()); - LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt); - } - configuration.set(UPSERT_STATEMENT, upsertStmt); - return upsertStmt; - - } - - public List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration,final String tableName) throws SQLException { - Preconditions.checkNotNull(configuration); - Preconditions.checkNotNull(tableName); - final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY); - if(isNotEmpty(columnInfoStr)) { - return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr); - } - final Connection connection = getConnection(configuration); - final List<String> selectColumnList = getSelectColumnList(configuration); - final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList); - final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList); - // we put the encoded column infos in the Configuration for re usability. - configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos); - closeConnection(connection); - return columnMetadataList; - } - - private List<String> getSelectColumnList( - final Configuration configuration) { - String selectColumns = configuration.get(SELECT_COLUMNS); - List<String> selectColumnList = null; - if(isNotEmpty(selectColumns)) { - final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER); - selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns)); - LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s " - ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList) - )); - } - return selectColumnList; - } - - public String getSelectStatement(final Configuration configuration,final String tableName) throws SQLException { - Preconditions.checkNotNull(configuration); - Preconditions.checkNotNull(tableName); - String selectStmt = configuration.get(SELECT_STATEMENT); - if(isNotEmpty(selectStmt)) { - return selectStmt; - } - final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration, tableName); - selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList); - LOG.info("Select Statement: "+ selectStmt); - configuration.set(SELECT_STATEMENT, selectStmt); - return selectStmt; - } - - public long getBatchSize(final Configuration configuration) throws SQLException { - Preconditions.checkNotNull(configuration); - long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE); - if(batchSize <= 0) { - Connection conn = getConnection(configuration); - batchSize = ((PhoenixConnection) conn).getMutateBatchSize(); - closeConnection(conn); - } - configuration.setLong(UPSERT_BATCH_SIZE, batchSize); - return batchSize; - } - - public int getSelectColumnsCount(Configuration configuration, - String tableName) throws SQLException { - Preconditions.checkNotNull(configuration); - final String schemaTp = configuration.get(SCHEMA_TYPE); - final SchemaType schemaType = SchemaType.valueOf(schemaTp); - int count = 0; - if(SchemaType.QUERY.equals(schemaType)) { - List<String> selectedColumnList = getSelectColumnList(configuration); - count = selectedColumnList == null ? 0 : selectedColumnList.size(); - } else { - List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration,tableName); - count = columnInfos == null ? 0 : columnInfos.size(); - } - return count; - } - - private void closeConnection(final Connection connection) throws SQLException { - if(connection != null) { - connection.close(); - } - } - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java deleted file mode 100644 index 2ef7914..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputFormat.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.pig.hadoop; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.SQLException; -import java.sql.Statement; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.pig.PhoenixPigConfiguration; -import org.apache.phoenix.query.KeyRange; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * The InputFormat class for generating the splits and creating the record readers. - * - */ -public final class PhoenixInputFormat extends InputFormat<NullWritable, PhoenixRecord> { - - private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class); - private PhoenixPigConfiguration phoenixConfiguration; - private Connection connection; - private QueryPlan queryPlan; - - /** - * instantiated by framework - */ - public PhoenixInputFormat() { - } - - @Override - public RecordReader<NullWritable, PhoenixRecord> createRecordReader(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - setConf(context.getConfiguration()); - final QueryPlan queryPlan = getQueryPlan(context); - try { - return new PhoenixRecordReader(phoenixConfiguration,queryPlan); - }catch(SQLException sqle) { - throw new IOException(sqle); - } - } - - - - @Override - public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { - setConf(context.getConfiguration()); - final QueryPlan queryPlan = getQueryPlan(context); - final List<KeyRange> allSplits = queryPlan.getSplits(); - final List<InputSplit> splits = generateSplits(queryPlan,allSplits); - return splits; - } - - private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException { - Preconditions.checkNotNull(qplan); - Preconditions.checkNotNull(splits); - final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size()); - for (List<Scan> scans : qplan.getScans()) { - psplits.add(new PhoenixInputSplit(scans)); - } - return psplits; - } - - public void setConf(Configuration configuration) { - this.phoenixConfiguration = new PhoenixPigConfiguration(configuration); - } - - public PhoenixPigConfiguration getConf() { - return this.phoenixConfiguration; - } - - private Connection getConnection() { - try { - if (this.connection == null) { - this.connection = phoenixConfiguration.getConnection(); - } - } catch (Exception e) { - throw new RuntimeException(e); - } - return connection; - } - - /** - * Returns the query plan associated with the select query. - * @param context - * @return - * @throws IOException - * @throws SQLException - */ - private QueryPlan getQueryPlan(final JobContext context) throws IOException { - Preconditions.checkNotNull(context); - if(queryPlan == null) { - try{ - final Connection connection = getConnection(); - final String selectStatement = getConf().getSelectStatement(); - Preconditions.checkNotNull(selectStatement); - final Statement statement = connection.createStatement(); - final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); - // Optimize the query plan so that we potentially use secondary indexes - this.queryPlan = pstmt.optimizeQuery(selectStatement); - // Initialize the query plan so it sets up the parallel scans - queryPlan.iterator(); - } catch(Exception exception) { - LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage())); - throw new RuntimeException(exception); - } - } - return queryPlan; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java deleted file mode 100644 index b1d015a..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixInputSplit.java +++ /dev/null @@ -1,134 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.pig.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.List; - -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.protobuf.ProtobufUtil; -import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.phoenix.query.KeyRange; - -import com.google.common.base.Preconditions; -import com.google.common.collect.Lists; - -/** - * - * Input split class to hold the lower and upper bound range. {@link KeyRange} - * - */ -public class PhoenixInputSplit extends InputSplit implements Writable { - - private List<Scan> scans; - private KeyRange keyRange; - - /** - * No Arg constructor - */ - public PhoenixInputSplit() { - } - - /** - * - * @param keyRange - */ - public PhoenixInputSplit(final List<Scan> scans) { - Preconditions.checkNotNull(scans); - Preconditions.checkState(!scans.isEmpty()); - this.scans = scans; - init(); - } - - public List<Scan> getScans() { - return scans; - } - - public KeyRange getKeyRange() { - return keyRange; - } - - private void init() { - this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow()); - } - - @Override - public void readFields(DataInput input) throws IOException { - int count = WritableUtils.readVInt(input); - scans = Lists.newArrayListWithExpectedSize(count); - for (int i = 0; i < count; i++) { - byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)]; - input.readFully(protoScanBytes); - ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes); - Scan scan = ProtobufUtil.toScan(protoScan); - scans.add(scan); - } - init(); - } - - @Override - public void write(DataOutput output) throws IOException { - Preconditions.checkNotNull(scans); - WritableUtils.writeVInt(output, scans.size()); - for (Scan scan : scans) { - ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan); - byte[] protoScanBytes = protoScan.toByteArray(); - WritableUtils.writeVInt(output, protoScanBytes.length); - output.write(protoScanBytes); - } - } - - @Override - public long getLength() throws IOException, InterruptedException { - return 0; - } - - @Override - public String[] getLocations() throws IOException, InterruptedException { - return new String[]{}; - } - - @Override - public int hashCode() { - final int prime = 31; - int result = 1; - result = prime * result + keyRange.hashCode(); - return result; - } - - @Override - public boolean equals(Object obj) { - // TODO: review: it's a reasonable check to use the keyRange, - // but it's not perfect. Do we need an equals impl? - if (this == obj) { return true; } - if (obj == null) { return false; } - if (!(obj instanceof PhoenixInputSplit)) { return false; } - PhoenixInputSplit other = (PhoenixInputSplit)obj; - if (keyRange == null) { - if (other.keyRange != null) { return false; } - } else if (!keyRange.equals(other.keyRange)) { return false; } - return true; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java deleted file mode 100644 index a8d9d8f..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputCommitter.java +++ /dev/null @@ -1,111 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.phoenix.pig.hadoop; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.SQLException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import org.apache.phoenix.jdbc.PhoenixStatement; - -/** - * - * {@link OutputCommitter} implementation for Pig tasks using Phoenix - * connections to upsert to HBase - * - * - * - */ -public class PhoenixOutputCommitter extends OutputCommitter { - private final Log LOG = LogFactory.getLog(PhoenixOutputCommitter.class); - - private final PhoenixOutputFormat outputFormat; - - public PhoenixOutputCommitter(PhoenixOutputFormat outputFormat) { - if(outputFormat == null) { - throw new IllegalArgumentException("PhoenixOutputFormat must not be null."); - } - this.outputFormat = outputFormat; - } - - /** - * TODO implement rollback functionality. - * - * {@link PhoenixStatement#execute(String)} is buffered on the client, this makes - * it difficult to implement rollback as once a commit is issued it's hard to go - * back all the way to undo. - */ - @Override - public void abortTask(TaskAttemptContext context) throws IOException { - } - - @Override - public void commitTask(TaskAttemptContext context) throws IOException { - commit(outputFormat.getConnection(context.getConfiguration())); - } - - @Override - public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { - return true; - } - - @Override - public void setupJob(JobContext jobContext) throws IOException { - } - - @Override - public void setupTask(TaskAttemptContext context) throws IOException { - } - - /** - * Commit a transaction on task completion - * - * @param connection - * @throws IOException - */ - private void commit(Connection connection) throws IOException { - try { - if (connection == null || connection.isClosed()) { - throw new IOException("Trying to commit a connection that is null or closed: "+ connection); - } - } catch (SQLException e) { - throw new IOException("Exception calling isClosed on connection", e); - } - - try { - LOG.debug("Commit called on task completion"); - connection.commit(); - } catch (SQLException e) { - throw new IOException("Exception while trying to commit a connection. ", e); - } finally { - try { - LOG.debug("Closing connection to database on task completion"); - connection.close(); - } catch (SQLException e) { - LOG.warn("Exception while trying to close database connection", e); - } - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java deleted file mode 100644 index 9c29f8f..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixOutputFormat.java +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.phoenix.pig.hadoop; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.SQLException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.OutputCommitter; -import org.apache.hadoop.mapreduce.OutputFormat; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import org.apache.phoenix.pig.PhoenixPigConfiguration; - -/** - * {@link OutputFormat} implementation for Phoenix - * - * - * - */ -public class PhoenixOutputFormat extends OutputFormat<NullWritable, PhoenixRecord> { - private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class); - - private Connection connection; - private PhoenixPigConfiguration config; - - @Override - public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { - } - - /** - * TODO Implement {@link OutputCommitter} to rollback in case of task failure - */ - - @Override - public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { - return new PhoenixOutputCommitter(this); - } - - @Override - public RecordWriter<NullWritable, PhoenixRecord> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { - try { - return new PhoenixRecordWriter(getConnection(context.getConfiguration()), config); - } catch (SQLException e) { - throw new IOException(e); - } - } - - /** - * This method creates a database connection. A single instance is created - * and passed around for re-use. - * - * @param configuration - * @return - * @throws IOException - */ - synchronized Connection getConnection(Configuration configuration) throws IOException { - if (connection != null) { - return connection; - } - - config = new PhoenixPigConfiguration(configuration); - try { - LOG.info("Initializing new Phoenix connection..."); - connection = config.getConnection(); - LOG.info("Initialized Phoenix connection, autoCommit="+ connection.getAutoCommit()); - return connection; - } catch (SQLException e) { - throw new IOException(e); - } - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java deleted file mode 100644 index 5063ed0..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecord.java +++ /dev/null @@ -1,112 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.phoenix.pig.hadoop; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; - -import org.apache.hadoop.io.Writable; -import org.apache.phoenix.pig.util.TypeUtil; -import org.apache.phoenix.schema.PDataType; -import org.apache.phoenix.util.ColumnInfo; -import org.apache.pig.ResourceSchema.ResourceFieldSchema; -import org.apache.pig.data.DataType; - -import com.google.common.base.Preconditions; - -/** - * A {@link Writable} representing a Phoenix record. This class - * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement} - * b) reads the column values from the {@link ResultSet} - * - */ -public class PhoenixRecord implements Writable { - - private final List<Object> values; - private final ResourceFieldSchema[] fieldSchemas; - - public PhoenixRecord() { - this(null); - } - - public PhoenixRecord(ResourceFieldSchema[] fieldSchemas) { - this.values = new ArrayList<Object>(); - this.fieldSchemas = fieldSchemas; - } - - @Override - public void readFields(DataInput in) throws IOException { - } - - @Override - public void write(DataOutput out) throws IOException { - } - - public void write(PreparedStatement statement, List<ColumnInfo> columnMetadataList) throws SQLException { - for (int i = 0; i < columnMetadataList.size(); i++) { - Object o = values.get(i); - ColumnInfo columnInfo = columnMetadataList.get(i); - byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType(); - try { - Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType()); - if (upsertValue != null) { - statement.setObject(i + 1, upsertValue, columnInfo.getSqlType()); - } else { - statement.setNull(i + 1, columnInfo.getSqlType()); - } - } catch (RuntimeException re) { - throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s" - ,columnInfo.toString(),re.getMessage()),re); - - } - } - - statement.execute(); - } - - public void read(final ResultSet rs, final int noOfColumns) throws SQLException { - Preconditions.checkNotNull(rs); - Preconditions.checkArgument(noOfColumns > 0, "No of arguments passed is <= 0"); - values.clear(); - for(int i = 1 ; i <= noOfColumns ; i++) { - Object obj = rs.getObject(i); - values.add(obj); - } - } - - public void add(Object value) { - values.add(value); - } - - private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) { - PDataType pDataType = PDataType.fromTypeId(sqlType); - - return TypeUtil.castPigTypeToPhoenix(o, type, pDataType); - } - - public List<Object> getValues() { - return values; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java deleted file mode 100644 index f6808a8..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordReader.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.pig.hadoop; - -import java.io.IOException; -import java.sql.SQLException; -import java.util.List; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; -import org.apache.phoenix.compile.QueryPlan; -import org.apache.phoenix.iterate.ConcatResultIterator; -import org.apache.phoenix.iterate.LookAheadResultIterator; -import org.apache.phoenix.iterate.PeekingResultIterator; -import org.apache.phoenix.iterate.ResultIterator; -import org.apache.phoenix.iterate.SequenceResultIterator; -import org.apache.phoenix.iterate.TableResultIterator; -import org.apache.phoenix.jdbc.PhoenixResultSet; -import org.apache.phoenix.pig.PhoenixPigConfiguration; - -import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; -import com.google.common.collect.Lists; - -/** - * RecordReader that process the scan and returns PhoenixRecord - * - */ -public final class PhoenixRecordReader extends RecordReader<NullWritable,PhoenixRecord>{ - - private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class); - private final PhoenixPigConfiguration phoenixConfiguration; - private final QueryPlan queryPlan; - private final int columnsCount; - private NullWritable key = NullWritable.get(); - private PhoenixRecord value = null; - private ResultIterator resultIterator = null; - private PhoenixResultSet resultSet; - - public PhoenixRecordReader(final PhoenixPigConfiguration pConfiguration,final QueryPlan qPlan) throws SQLException { - - Preconditions.checkNotNull(pConfiguration); - Preconditions.checkNotNull(qPlan); - this.phoenixConfiguration = pConfiguration; - this.queryPlan = qPlan; - this.columnsCount = phoenixConfiguration.getSelectColumnsCount(); - } - - @Override - public void close() throws IOException { - if(resultIterator != null) { - try { - resultIterator.close(); - } catch (SQLException e) { - LOG.error(" Error closing resultset."); - } - } - } - - @Override - public NullWritable getCurrentKey() throws IOException, InterruptedException { - return key; - } - - @Override - public PhoenixRecord getCurrentValue() throws IOException, InterruptedException { - return value; - } - - @Override - public float getProgress() throws IOException, InterruptedException { - return 0; - } - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { - final PhoenixInputSplit pSplit = (PhoenixInputSplit)split; - final List<Scan> scans = pSplit.getScans(); - try { - List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size()); - for (Scan scan : scans) { - final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan); - PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); - iterators.add(peekingResultIterator); - } - ResultIterator iterator = ConcatResultIterator.newIterator(iterators); - if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) { - iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager()); - } - this.resultIterator = iterator; - this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement()); - } catch (SQLException e) { - LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage())); - Throwables.propagate(e); - } - - } - - @Override - public boolean nextKeyValue() throws IOException, InterruptedException { - if (key == null) { - key = NullWritable.get(); - } - if (value == null) { - value = new PhoenixRecord(); - } - Preconditions.checkNotNull(this.resultSet); - try { - if(!resultSet.next()) { - return false; - } - value.read(resultSet,columnsCount); - return true; - } catch (SQLException e) { - LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage())); - Throwables.propagate(e); - } - return false; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java deleted file mode 100644 index c980a38..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/hadoop/PhoenixRecordWriter.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.phoenix.pig.hadoop; - -import java.io.IOException; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.SQLException; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.io.NullWritable; -import org.apache.hadoop.mapreduce.RecordWriter; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import org.apache.phoenix.pig.PhoenixPigConfiguration; - -/** - * - * {@link RecordWriter} implementation for Phoenix - * - * - * - */ -public class PhoenixRecordWriter extends RecordWriter<NullWritable, PhoenixRecord> { - - private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class); - - private long numRecords = 0; - - private final Connection conn; - private final PreparedStatement statement; - private final PhoenixPigConfiguration config; - private final long batchSize; - - public PhoenixRecordWriter(Connection conn, PhoenixPigConfiguration config) throws SQLException { - this.conn = conn; - this.config = config; - this.batchSize = config.getBatchSize(); - this.statement = this.conn.prepareStatement(config.getUpsertStatement()); - } - - - /** - * Committing and closing the connection is handled by {@link PhoenixOutputCommitter}. - * - */ - @Override - public void close(TaskAttemptContext context) throws IOException, InterruptedException { - } - - @Override - public void write(NullWritable n, PhoenixRecord record) throws IOException, InterruptedException { - try { - record.write(statement, config.getColumnMetadataList()); - numRecords++; - - if (numRecords % batchSize == 0) { - LOG.debug("commit called on a batch of size : " + batchSize); - conn.commit(); - } - } catch (SQLException e) { - throw new IOException("Exception while committing to database.", e); - } - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java deleted file mode 100644 index 3ea9b5b..0000000 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoder.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.pig.util; - -import java.util.List; - -import org.apache.phoenix.util.ColumnInfo; - -import com.google.common.base.Function; -import com.google.common.base.Joiner; -import com.google.common.base.Preconditions; -import com.google.common.base.Splitter; -import com.google.common.collect.Iterables; -import com.google.common.collect.Lists; - -/** - * - * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back. - * - */ -public final class ColumnInfoToStringEncoderDecoder { - - private static final String COLUMN_INFO_DELIMITER = "|"; - - private ColumnInfoToStringEncoderDecoder() { - - } - - public static String encode(List<ColumnInfo> columnInfos) { - Preconditions.checkNotNull(columnInfos); - return Joiner.on(COLUMN_INFO_DELIMITER). - skipNulls().join(columnInfos); - } - - public static List<ColumnInfo> decode(final String columnInfoStr) { - Preconditions.checkNotNull(columnInfoStr); - List<ColumnInfo> columnInfos = Lists.newArrayList( - Iterables.transform( - Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr), - new Function<String, ColumnInfo>() { - @Override - public ColumnInfo apply(String colInfo) { - if (colInfo.isEmpty()) { - return null; - } - return ColumnInfo.fromString(colInfo); - } - })); - return columnInfos; - - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java index 9f8a5e4..4f7d776 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtil.java @@ -25,8 +25,9 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.phoenix.pig.PhoenixPigConfiguration; -import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import org.apache.phoenix.schema.PDataType; import org.apache.phoenix.util.ColumnInfo; import org.apache.pig.ResourceSchema; @@ -46,19 +47,20 @@ public final class PhoenixPigSchemaUtil { private PhoenixPigSchemaUtil() { } - public static ResourceSchema getResourceSchema(final PhoenixPigConfiguration phoenixConfiguration) throws IOException { +public static ResourceSchema getResourceSchema(final Configuration configuration) throws IOException { final ResourceSchema schema = new ResourceSchema(); try { - List<ColumnInfo> columns = null; - if(SchemaType.QUERY.equals(phoenixConfiguration.getSchemaType())) { - final String sqlQuery = phoenixConfiguration.getSelectStatement(); - Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration"); - final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(phoenixConfiguration); - columns = function.apply(sqlQuery); - } else { - columns = phoenixConfiguration.getSelectColumnMetadataList(); - } + List<ColumnInfo> columns = null; + final SchemaType schemaType = PhoenixConfigurationUtil.getSchemaType(configuration); + if(SchemaType.QUERY.equals(schemaType)) { + final String sqlQuery = PhoenixConfigurationUtil.getSelectStatement(configuration); + Preconditions.checkNotNull(sqlQuery, "No Sql Query exists within the configuration"); + final SqlQueryToColumnInfoFunction function = new SqlQueryToColumnInfoFunction(configuration); + columns = function.apply(sqlQuery); + } else { + columns = PhoenixConfigurationUtil.getSelectColumnMetadataList(configuration); + } ResourceFieldSchema fields[] = new ResourceFieldSchema[columns.size()]; int i = 0; for(ColumnInfo cinfo : columns) { @@ -76,6 +78,5 @@ public final class PhoenixPigSchemaUtil { } return schema; - } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java index 1b3a90a..f0148a6 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/QuerySchemaParserFunction.java @@ -26,16 +26,16 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.pig.PhoenixPigConfiguration; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; import com.google.common.base.Function; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; /** @@ -46,21 +46,20 @@ import com.google.common.collect.Lists; public class QuerySchemaParserFunction implements Function<String,Pair<String,String>> { private static final Log LOG = LogFactory.getLog(QuerySchemaParserFunction.class); - private PhoenixPigConfiguration phoenixConfiguration; + private final Configuration configuration; - public QuerySchemaParserFunction(PhoenixPigConfiguration phoenixConfiguration) { - Preconditions.checkNotNull(phoenixConfiguration); - this.phoenixConfiguration = phoenixConfiguration; + public QuerySchemaParserFunction(Configuration configuration) { + Preconditions.checkNotNull(configuration); + this.configuration = configuration; } @Override public Pair<String, String> apply(final String selectStatement) { Preconditions.checkNotNull(selectStatement); Preconditions.checkArgument(!selectStatement.isEmpty(), "Select Query is empty!!"); - Preconditions.checkNotNull(this.phoenixConfiguration); Connection connection = null; try { - connection = this.phoenixConfiguration.getConnection(); + connection = ConnectionUtil.getConnection(this.configuration); final Statement statement = connection.createStatement(); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); final QueryPlan queryPlan = pstmt.compileQuery(selectStatement); @@ -78,17 +77,17 @@ public class QuerySchemaParserFunction implements Function<String,Pair<String,St return new Pair<String, String>(tableName, columnsAsStr); } catch (SQLException e) { LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),selectStatement)); - Throwables.propagate(e); + throw new RuntimeException(e); } finally { if(connection != null) { try { connection.close(); } catch(SQLException sqle) { - Throwables.propagate(sqle); + LOG.error(" Error closing connection "); + throw new RuntimeException(sqle); } } } - return null; } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java index 52f646c..3ed35bb 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/SqlQueryToColumnInfoFunction.java @@ -26,60 +26,59 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; import org.apache.phoenix.compile.ColumnProjector; import org.apache.phoenix.compile.QueryPlan; import org.apache.phoenix.jdbc.PhoenixStatement; -import org.apache.phoenix.pig.PhoenixPigConfiguration; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; import org.apache.phoenix.util.ColumnInfo; import com.google.common.base.Function; import com.google.common.base.Preconditions; -import com.google.common.base.Throwables; import com.google.common.collect.Lists; public final class SqlQueryToColumnInfoFunction implements Function<String,List<ColumnInfo>> { - - private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class); - private final PhoenixPigConfiguration phoenixConfiguration; + + private static final Log LOG = LogFactory.getLog(SqlQueryToColumnInfoFunction.class); + private final Configuration configuration; - public SqlQueryToColumnInfoFunction( - final PhoenixPigConfiguration phoenixPigConfiguration) { - super(); - this.phoenixConfiguration = phoenixPigConfiguration; - } + public SqlQueryToColumnInfoFunction(final Configuration configuration) { + this.configuration = configuration; + } - @Override - public List<ColumnInfo> apply(String sqlQuery) { - Preconditions.checkNotNull(sqlQuery); - Connection connection = null; - List<ColumnInfo> columnInfos = null; + @Override + public List<ColumnInfo> apply(String sqlQuery) { + Preconditions.checkNotNull(sqlQuery); + Connection connection = null; + List<ColumnInfo> columnInfos = null; try { - connection = this.phoenixConfiguration.getConnection(); + connection = ConnectionUtil.getConnection(this.configuration); final Statement statement = connection.createStatement(); final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); final QueryPlan queryPlan = pstmt.compileQuery(sqlQuery); final List<? extends ColumnProjector> projectedColumns = queryPlan.getProjector().getColumnProjectors(); columnInfos = Lists.newArrayListWithCapacity(projectedColumns.size()); columnInfos = Lists.transform(projectedColumns, new Function<ColumnProjector,ColumnInfo>() { - @Override - public ColumnInfo apply(final ColumnProjector columnProjector) { - return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType()); - } - + @Override + public ColumnInfo apply(final ColumnProjector columnProjector) { + return new ColumnInfo(columnProjector.getName(), columnProjector.getExpression().getDataType().getSqlType()); + } + }); - } catch (SQLException e) { + } catch (SQLException e) { LOG.error(String.format(" Error [%s] parsing SELECT query [%s] ",e.getMessage(),sqlQuery)); - Throwables.propagate(e); + throw new RuntimeException(e); } finally { if(connection != null) { try { connection.close(); } catch(SQLException sqle) { - Throwables.propagate(sqle); + LOG.error("Error closing connection!!"); + throw new RuntimeException(sqle); } } } - return columnInfos; - } + return columnInfos; + } } http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java index 1cdd66d..1da2d01 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/util/TypeUtil.java @@ -27,7 +27,7 @@ import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.phoenix.pig.hadoop.PhoenixRecord; +import org.apache.phoenix.pig.writable.PhoenixPigDBWritable; import org.apache.phoenix.schema.PDataType; import org.apache.pig.PigException; import org.apache.pig.ResourceSchema.ResourceFieldSchema; @@ -240,7 +240,7 @@ public final class TypeUtil { * @return * @throws IOException */ - public static Tuple transformToTuple(final PhoenixRecord record, final ResourceFieldSchema[] projectedColumns) throws IOException { + public static Tuple transformToTuple(final PhoenixPigDBWritable record, final ResourceFieldSchema[] projectedColumns) throws IOException { List<Object> columnValues = record.getValues(); if(columnValues == null || columnValues.size() == 0 || projectedColumns == null || projectedColumns.length != columnValues.size()) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java new file mode 100644 index 0000000..a7399c9 --- /dev/null +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/writable/PhoenixPigDBWritable.java @@ -0,0 +1,121 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.pig.writable; + +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.pig.util.TypeUtil; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.pig.ResourceSchema.ResourceFieldSchema; +import org.apache.pig.data.DataType; + +import com.google.common.base.Preconditions; + +/** + * A {@link Writable} representing a Phoenix record. This class + * a) does a type mapping and sets the value accordingly in the {@link PreparedStatement} + * b) reads the column values from the {@link ResultSet} + * + */ +public class PhoenixPigDBWritable implements DBWritable { + + private final List<Object> values; + private ResourceFieldSchema[] fieldSchemas; + private List<ColumnInfo> columnMetadataList; + + public PhoenixPigDBWritable() { + this.values = new ArrayList<Object>(); + } + + @Override + public void write(PreparedStatement statement) throws SQLException { + for (int i = 0; i < columnMetadataList.size(); i++) { + Object o = values.get(i); + ColumnInfo columnInfo = columnMetadataList.get(i); + byte type = (fieldSchemas == null) ? DataType.findType(o) : fieldSchemas[i].getType(); + try { + Object upsertValue = convertTypeSpecificValue(o, type, columnInfo.getSqlType()); + if (upsertValue != null) { + statement.setObject(i + 1, upsertValue, columnInfo.getSqlType()); + } else { + statement.setNull(i + 1, columnInfo.getSqlType()); + } + } catch (RuntimeException re) { + throw new RuntimeException(String.format("Unable to process column %s, innerMessage=%s" + ,columnInfo.toString(),re.getMessage()),re); + + } + } + } + + public void add(Object value) { + values.add(value); + } + + private Object convertTypeSpecificValue(Object o, byte type, Integer sqlType) { + PDataType pDataType = PDataType.fromTypeId(sqlType); + return TypeUtil.castPigTypeToPhoenix(o, type, pDataType); + } + + public List<Object> getValues() { + return values; + } + + @Override + public void readFields(final ResultSet rs) throws SQLException { + Preconditions.checkNotNull(rs); + final int noOfColumns = rs.getMetaData().getColumnCount(); + values.clear(); + for(int i = 1 ; i <= noOfColumns ; i++) { + Object obj = rs.getObject(i); + values.add(obj); + } + } + + public ResourceFieldSchema[] getFieldSchemas() { + return fieldSchemas; + } + + public void setFieldSchemas(ResourceFieldSchema[] fieldSchemas) { + this.fieldSchemas = fieldSchemas; + } + + public List<ColumnInfo> getColumnMetadataList() { + return columnMetadataList; + } + + public void setColumnMetadataList(List<ColumnInfo> columnMetadataList) { + this.columnMetadataList = columnMetadataList; + } + + public static PhoenixPigDBWritable newInstance(final ResourceFieldSchema[] fieldSchemas, + final List<ColumnInfo> columnMetadataList) { + final PhoenixPigDBWritable dbWritable = new PhoenixPigDBWritable (); + dbWritable.setFieldSchemas(fieldSchemas); + dbWritable.setColumnMetadataList(columnMetadataList); + return dbWritable; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java deleted file mode 100644 index ac254e6..0000000 --- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/PhoenixPigConfigurationTest.java +++ /dev/null @@ -1,86 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.pig; - -import static org.junit.Assert.assertEquals; - -import java.sql.SQLException; - -import org.apache.hadoop.conf.Configuration; -import org.junit.Test; - - -/** - * Tests for PhoenixPigConfiguration. - * - */ -public class PhoenixPigConfigurationTest { - - - @Test - public void testBasicConfiguration() throws SQLException { - Configuration conf = new Configuration(); - final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(conf); - final String zkQuorum = "localhost"; - final String tableName = "TABLE"; - final long batchSize = 100; - phoenixConfiguration.configure(zkQuorum, tableName, batchSize); - assertEquals(zkQuorum,phoenixConfiguration.getServer()); - assertEquals(tableName,phoenixConfiguration.getTableName()); - assertEquals(batchSize,phoenixConfiguration.getBatchSize()); - } - - /* @Test - public void testConfiguration() throws SQLException { - Configuration configuration = new Configuration(); - final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration); - final String zkQuorum = "localhost"; - final String tableName = "TABLE"; - final long batchSize = 100; - phoenixConfiguration.configure(zkQuorum, tableName, batchSize); - PhoenixPigConfigurationUtil util = Mockito.mock(PhoenixPigConfigurationUtil.class); - phoenixConfiguration.setUtil(util); - phoenixConfiguration.getColumnMetadataList(); - Mockito.verify(util).getUpsertColumnMetadataList(configuration, tableName); - Mockito.verifyNoMoreInteractions(util); - - phoenixConfiguration.getSelectStatement(); - Mockito.verify(util).getSelectStatement(configuration, tableName); - Mockito.verifyNoMoreInteractions(util); - } - - @Test - public void testWithSpy() throws SQLException { - Configuration configuration = new Configuration(); - final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration); - final String zkQuorum = "localhost"; - final String tableName = "TABLE"; - final long batchSize = 100; - phoenixConfiguration.configure(zkQuorum, tableName, batchSize); - phoenixConfiguration.setSelectStatement("SELECT 1 from TABLE"); - PhoenixPigConfigurationUtil util = new PhoenixPigConfigurationUtil(); - PhoenixPigConfigurationUtil spied = Mockito.spy(util); - phoenixConfiguration.setUtil(spied); - - phoenixConfiguration.getSelectStatement(); - Mockito.verify(spied,Mockito.times(1)).getSelectStatement(configuration, tableName); - Mockito.verifyNoMoreInteractions(util); - }*/ -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java deleted file mode 100644 index 9777bb5..0000000 --- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/ColumnInfoToStringEncoderDecoderTest.java +++ /dev/null @@ -1,61 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.pig.util; - -import static org.junit.Assert.assertEquals; - -import java.util.List; - -import org.apache.phoenix.pig.util.ColumnInfoToStringEncoderDecoder; -import org.apache.phoenix.schema.PDataType; -import org.apache.phoenix.util.ColumnInfo; -import org.junit.Test; - -import com.google.common.collect.Lists; - -/** - * Tests methods on {@link ColumnInfoToStringEncoderDecoder} - */ -public class ColumnInfoToStringEncoderDecoderTest { - - @Test - public void testEncode() { - final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType()); - final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo)); - assertEquals(columnInfo.toString(),encodedColumnInfo); - } - - @Test - public void testDecode() { - final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType()); - final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo)); - assertEquals(columnInfo.toString(),encodedColumnInfo); - } - - @Test - public void testEncodeDecodeWithNulls() { - final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType()); - final ColumnInfo columnInfo2 = null; - final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2)); - final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr); - assertEquals(1,decodedColumnInfo.size()); - } - -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/f84e5da3/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java index 310128c..7a861b9 100644 --- a/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java +++ b/phoenix-pig/src/test/java/org/apache/phoenix/pig/util/PhoenixPigSchemaUtilTest.java @@ -29,8 +29,10 @@ import java.sql.SQLException; import java.sql.Types; import java.util.List; -import org.apache.phoenix.pig.PhoenixPigConfiguration; -import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil; +import org.apache.hadoop.conf.Configuration; +import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import org.apache.phoenix.schema.IllegalDataException; import org.apache.phoenix.util.ColumnInfo; import org.apache.pig.ResourceSchema; @@ -54,9 +56,11 @@ public class PhoenixPigSchemaUtilTest { @Test public void testSchema() throws SQLException, IOException { - final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class); + final Configuration configuration = mock(Configuration.class); final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,NAME_COLUMN); - when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos); + final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnInfos); + when(configuration.get(PhoenixConfigurationUtil.SELECT_COLUMN_INFO_KEY)).thenReturn(encodedColumnInfos); + when(configuration.get(PhoenixConfigurationUtil.SCHEMA_TYPE)).thenReturn(SchemaType.TABLE.name()); final ResourceSchema actual = PhoenixPigSchemaUtil.getResourceSchema(configuration); // expected schema. @@ -75,9 +79,10 @@ public class PhoenixPigSchemaUtilTest { @Test(expected=IllegalDataException.class) public void testUnSupportedTypes() throws SQLException, IOException { - final PhoenixPigConfiguration configuration = mock(PhoenixPigConfiguration.class); + final Configuration configuration = mock(Configuration.class); final List<ColumnInfo> columnInfos = ImmutableList.of(ID_COLUMN,LOCATION_COLUMN); - when(configuration.getSelectColumnMetadataList()).thenReturn(columnInfos); + final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnInfos); + when(configuration.get(PhoenixConfigurationUtil.SELECT_COLUMN_INFO_KEY)).thenReturn(encodedColumnInfos); PhoenixPigSchemaUtil.getResourceSchema(configuration); fail("We currently don't support Array type yet. WIP!!"); }
