PHOENIX-1454 Map Reduce over Phoenix tables
Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/b70f3abd Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/b70f3abd Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/b70f3abd Branch: refs/heads/4.0 Commit: b70f3abd2f7e0d4393a5235a46de73f445ea51d1 Parents: 0de7863 Author: ravimagham <[email protected]> Authored: Mon Dec 8 18:02:57 2014 -0800 Committer: ravimagham <[email protected]> Committed: Mon Dec 8 18:02:57 2014 -0800 ---------------------------------------------------------------------- .../phoenix/mapreduce/PhoenixInputFormat.java | 117 +++++++ .../phoenix/mapreduce/PhoenixInputSplit.java | 129 +++++++ .../mapreduce/PhoenixOutputCommitter.java | 54 +++ .../phoenix/mapreduce/PhoenixOutputFormat.java | 62 ++++ .../phoenix/mapreduce/PhoenixRecordReader.java | 140 ++++++++ .../phoenix/mapreduce/PhoenixRecordWriter.java | 91 +++++ .../util/ColumnInfoToStringEncoderDecoder.java | 65 ++++ .../phoenix/mapreduce/util/ConnectionUtil.java | 49 +++ .../util/PhoenixConfigurationUtil.java | 299 ++++++++++++++++ .../mapreduce/util/PhoenixMapReduceUtil.java | 99 ++++++ .../org/apache/phoenix/util/PhoenixRuntime.java | 6 +- .../java/org/apache/phoenix/util/QueryUtil.java | 22 +- .../ColumnInfoToStringEncoderDecoderTest.java | 61 ++++ .../util/PhoenixConfigurationUtilTest.java | 124 +++++++ .../org/apache/phoenix/util/QueryUtilTest.java | 2 +- .../phoenix/pig/PhoenixPigConfigurationIT.java | 109 ------ .../apache/phoenix/pig/PhoenixHBaseLoader.java | 45 +-- .../apache/phoenix/pig/PhoenixHBaseStorage.java | 218 ++++++------ .../phoenix/pig/PhoenixPigConfiguration.java | 340 ------------------- .../phoenix/pig/hadoop/PhoenixInputFormat.java | 142 -------- .../phoenix/pig/hadoop/PhoenixInputSplit.java | 134 -------- .../pig/hadoop/PhoenixOutputCommitter.java | 111 ------ .../phoenix/pig/hadoop/PhoenixOutputFormat.java | 94 ----- .../phoenix/pig/hadoop/PhoenixRecord.java | 112 ------ .../phoenix/pig/hadoop/PhoenixRecordReader.java | 142 -------- .../phoenix/pig/hadoop/PhoenixRecordWriter.java | 83 ----- .../util/ColumnInfoToStringEncoderDecoder.java | 69 ---- .../phoenix/pig/util/PhoenixPigSchemaUtil.java | 27 +- .../pig/util/QuerySchemaParserFunction.java | 21 +- .../pig/util/SqlQueryToColumnInfoFunction.java | 51 ++- .../org/apache/phoenix/pig/util/TypeUtil.java | 4 +- .../pig/writable/PhoenixPigDBWritable.java | 121 +++++++ .../pig/PhoenixPigConfigurationTest.java | 49 --- .../ColumnInfoToStringEncoderDecoderTest.java | 61 ---- .../pig/util/PhoenixPigSchemaUtilTest.java | 17 +- .../pig/util/QuerySchemaParserFunctionTest.java | 22 +- .../util/SqlQueryToColumnInfoFunctionTest.java | 22 +- 37 files changed, 1642 insertions(+), 1672 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java new file mode 100644 index 0000000..7c67c2c --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputFormat.java @@ -0,0 +1,117 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputFormat; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.jdbc.PhoenixStatement; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.query.KeyRange; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * {@link InputFormat} implementation from Phoenix. + * + */ +public class PhoenixInputFormat<T extends DBWritable> extends InputFormat<NullWritable,T> { + + private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class); + + /** + * instantiated by framework + */ + public PhoenixInputFormat() { + } + + @Override + public RecordReader<NullWritable,T> createRecordReader(InputSplit split, TaskAttemptContext context) + throws IOException, InterruptedException { + + final Configuration configuration = context.getConfiguration(); + final QueryPlan queryPlan = getQueryPlan(context,configuration); + @SuppressWarnings("unchecked") + final Class<T> inputClass = (Class<T>) PhoenixConfigurationUtil.getInputClass(configuration); + return new PhoenixRecordReader<T>(inputClass , configuration, queryPlan); + } + + + + @Override + public List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException { + final Configuration configuration = context.getConfiguration(); + final QueryPlan queryPlan = getQueryPlan(context,configuration); + final List<KeyRange> allSplits = queryPlan.getSplits(); + final List<InputSplit> splits = generateSplits(queryPlan,allSplits); + return splits; + } + + private List<InputSplit> generateSplits(final QueryPlan qplan, final List<KeyRange> splits) throws IOException { + Preconditions.checkNotNull(qplan); + Preconditions.checkNotNull(splits); + final List<InputSplit> psplits = Lists.newArrayListWithExpectedSize(splits.size()); + for (List<Scan> scans : qplan.getScans()) { + psplits.add(new PhoenixInputSplit(scans)); + } + return psplits; + } + + /** + * Returns the query plan associated with the select query. + * @param context + * @return + * @throws IOException + * @throws SQLException + */ + private QueryPlan getQueryPlan(final JobContext context,final Configuration configuration) throws IOException { + Preconditions.checkNotNull(context); + try{ + final Connection connection = ConnectionUtil.getConnection(configuration); + final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); + Preconditions.checkNotNull(selectStatement); + final Statement statement = connection.createStatement(); + final PhoenixStatement pstmt = statement.unwrap(PhoenixStatement.class); + // Optimize the query plan so that we potentially use secondary indexes + final QueryPlan queryPlan = pstmt.optimizeQuery(selectStatement); + // Initialize the query plan so it sets up the parallel scans + queryPlan.iterator(); + return queryPlan; + } catch(Exception exception) { + LOG.error(String.format("Failed to get the query plan with error [%s]",exception.getMessage())); + throw new RuntimeException(exception); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java new file mode 100644 index 0000000..b222fc9 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixInputSplit.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; +import java.util.List; + +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.hbase.protobuf.ProtobufUtil; +import org.apache.hadoop.hbase.protobuf.generated.ClientProtos; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableUtils; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.phoenix.query.KeyRange; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; + +/** + * Input split class to hold the lower and upper bound range. {@link KeyRange} + */ +public class PhoenixInputSplit extends InputSplit implements Writable { + + private List<Scan> scans; + private KeyRange keyRange; + + /** + * No Arg constructor + */ + public PhoenixInputSplit() { + } + + /** + * + * @param keyRange + */ + public PhoenixInputSplit(final List<Scan> scans) { + Preconditions.checkNotNull(scans); + Preconditions.checkState(!scans.isEmpty()); + this.scans = scans; + init(); + } + + public List<Scan> getScans() { + return scans; + } + + public KeyRange getKeyRange() { + return keyRange; + } + + private void init() { + this.keyRange = KeyRange.getKeyRange(scans.get(0).getStartRow(), scans.get(scans.size()-1).getStopRow()); + } + + @Override + public void readFields(DataInput input) throws IOException { + int count = WritableUtils.readVInt(input); + scans = Lists.newArrayListWithExpectedSize(count); + for (int i = 0; i < count; i++) { + byte[] protoScanBytes = new byte[WritableUtils.readVInt(input)]; + input.readFully(protoScanBytes); + ClientProtos.Scan protoScan = ClientProtos.Scan.parseFrom(protoScanBytes); + Scan scan = ProtobufUtil.toScan(protoScan); + scans.add(scan); + } + init(); + } + + @Override + public void write(DataOutput output) throws IOException { + Preconditions.checkNotNull(scans); + WritableUtils.writeVInt(output, scans.size()); + for (Scan scan : scans) { + ClientProtos.Scan protoScan = ProtobufUtil.toScan(scan); + byte[] protoScanBytes = protoScan.toByteArray(); + WritableUtils.writeVInt(output, protoScanBytes.length); + output.write(protoScanBytes); + } + } + + @Override + public long getLength() throws IOException, InterruptedException { + return 0; + } + + @Override + public String[] getLocations() throws IOException, InterruptedException { + return new String[]{}; + } + + @Override + public int hashCode() { + final int prime = 31; + int result = 1; + result = prime * result + keyRange.hashCode(); + return result; + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { return true; } + if (obj == null) { return false; } + if (!(obj instanceof PhoenixInputSplit)) { return false; } + PhoenixInputSplit other = (PhoenixInputSplit)obj; + if (keyRange == null) { + if (other.keyRange != null) { return false; } + } else if (!keyRange.equals(other.keyRange)) { return false; } + return true; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java new file mode 100644 index 0000000..ffee5c7 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputCommitter.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; + +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * A no-op {@link OutputCommitter} + */ +public class PhoenixOutputCommitter extends OutputCommitter { + + public PhoenixOutputCommitter() {} + + @Override + public void abortTask(TaskAttemptContext context) throws IOException { + } + + @Override + public void commitTask(TaskAttemptContext context) throws IOException { + } + + @Override + public boolean needsTaskCommit(TaskAttemptContext context) throws IOException { + return true; + } + + @Override + public void setupJob(JobContext jobContext) throws IOException { + } + + @Override + public void setupTask(TaskAttemptContext context) throws IOException { + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java new file mode 100644 index 0000000..e55b977 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixOutputFormat.java @@ -0,0 +1,62 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.OutputFormat; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; + +/** + * {@link OutputFormat} implementation for Phoenix. + * + */ +public class PhoenixOutputFormat <T extends DBWritable> extends OutputFormat<NullWritable,T> { + private static final Log LOG = LogFactory.getLog(PhoenixOutputFormat.class); + + @Override + public void checkOutputSpecs(JobContext jobContext) throws IOException, InterruptedException { + } + + /** + * + */ + @Override + public OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException { + return new PhoenixOutputCommitter(); + } + + @Override + public RecordWriter<NullWritable, T> getRecordWriter(TaskAttemptContext context) throws IOException, InterruptedException { + try { + return new PhoenixRecordWriter<T>(context.getConfiguration()); + } catch (SQLException e) { + LOG.error("Error calling PhoenixRecordWriter " + e.getMessage()); + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java new file mode 100644 index 0000000..2c206ab --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordReader.java @@ -0,0 +1,140 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.client.Scan; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.phoenix.compile.QueryPlan; +import org.apache.phoenix.iterate.ConcatResultIterator; +import org.apache.phoenix.iterate.LookAheadResultIterator; +import org.apache.phoenix.iterate.PeekingResultIterator; +import org.apache.phoenix.iterate.ResultIterator; +import org.apache.phoenix.iterate.SequenceResultIterator; +import org.apache.phoenix.iterate.TableResultIterator; +import org.apache.phoenix.jdbc.PhoenixResultSet; + +import com.google.common.base.Preconditions; +import com.google.common.base.Throwables; +import com.google.common.collect.Lists; + +/** + * {@link RecordReader} implementation that iterates over the the records. + */ +public class PhoenixRecordReader<T extends DBWritable> extends RecordReader<NullWritable,T> { + + private static final Log LOG = LogFactory.getLog(PhoenixRecordReader.class); + private final Configuration configuration; + private final QueryPlan queryPlan; + private NullWritable key = NullWritable.get(); + private T value = null; + private Class<T> inputClass; + private ResultIterator resultIterator = null; + private PhoenixResultSet resultSet; + + public PhoenixRecordReader(Class<T> inputClass,final Configuration configuration,final QueryPlan queryPlan) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(queryPlan); + this.inputClass = inputClass; + this.configuration = configuration; + this.queryPlan = queryPlan; + } + + @Override + public void close() throws IOException { + if(resultIterator != null) { + try { + resultIterator.close(); + } catch (SQLException e) { + LOG.error(" Error closing resultset."); + throw new RuntimeException(e); + } + } + } + + @Override + public NullWritable getCurrentKey() throws IOException, InterruptedException { + return key; + } + + @Override + public T getCurrentValue() throws IOException, InterruptedException { + return value; + } + + @Override + public float getProgress() throws IOException, InterruptedException { + return 0; + } + + @Override + public void initialize(InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException { + final PhoenixInputSplit pSplit = (PhoenixInputSplit)split; + final List<Scan> scans = pSplit.getScans(); + try { + List<PeekingResultIterator> iterators = Lists.newArrayListWithExpectedSize(scans.size()); + for (Scan scan : scans) { + final TableResultIterator tableResultIterator = new TableResultIterator(queryPlan.getContext(), queryPlan.getTableRef(),scan); + PeekingResultIterator peekingResultIterator = LookAheadResultIterator.wrap(tableResultIterator); + iterators.add(peekingResultIterator); + } + ResultIterator iterator = ConcatResultIterator.newIterator(iterators); + if(queryPlan.getContext().getSequenceManager().getSequenceCount() > 0) { + iterator = new SequenceResultIterator(iterator, queryPlan.getContext().getSequenceManager()); + } + this.resultIterator = iterator; + this.resultSet = new PhoenixResultSet(this.resultIterator, queryPlan.getProjector(),queryPlan.getContext().getStatement()); + } catch (SQLException e) { + LOG.error(String.format(" Error [%s] initializing PhoenixRecordReader. ",e.getMessage())); + Throwables.propagate(e); + } + } + + @Override + public boolean nextKeyValue() throws IOException, InterruptedException { + if (key == null) { + key = NullWritable.get(); + } + if (value == null) { + value = ReflectionUtils.newInstance(inputClass, this.configuration); + } + Preconditions.checkNotNull(this.resultSet); + try { + if(!resultSet.next()) { + return false; + } + value.readFields(resultSet); + return true; + } catch (SQLException e) { + LOG.error(String.format(" Error [%s] occurred while iterating over the resultset. ",e.getMessage())); + throw new RuntimeException(e); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java new file mode 100644 index 0000000..4d26bf4 --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/PhoenixRecordWriter.java @@ -0,0 +1,91 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.SQLException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.mapreduce.util.ConnectionUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; + +/** + * Default {@link RecordWriter} implementation from Phoenix + * + */ +public class PhoenixRecordWriter<T extends DBWritable> extends RecordWriter<NullWritable, T> { + + private static final Log LOG = LogFactory.getLog(PhoenixRecordWriter.class); + + private final Connection conn; + private final PreparedStatement statement; + private final long batchSize; + private long numRecords = 0; + + public PhoenixRecordWriter(final Configuration configuration) throws SQLException { + this.conn = ConnectionUtil.getConnection(configuration); + this.batchSize = PhoenixConfigurationUtil.getBatchSize(configuration); + final String upsertQuery = PhoenixConfigurationUtil.getUpsertStatement(configuration); + this.statement = this.conn.prepareStatement(upsertQuery); + } + + @Override + public void close(TaskAttemptContext context) throws IOException, InterruptedException { + try { + statement.executeBatch(); + conn.commit(); + } catch (SQLException e) { + LOG.error("SQLException while performing the commit for the task."); + throw new RuntimeException(e); + } finally { + try { + statement.close(); + conn.close(); + } + catch (SQLException ex) { + LOG.error("SQLException while closing the connection for the task."); + throw new RuntimeException(ex); + } + } + } + + @Override + public void write(NullWritable n, T record) throws IOException, InterruptedException { + try { + record.write(statement); + numRecords++; + statement.addBatch(); + if (numRecords % batchSize == 0) { + LOG.debug("commit called on a batch of size : " + batchSize); + statement.executeBatch(); + conn.commit(); + } + } catch (SQLException e) { + throw new RuntimeException("Exception while committing to database.", e); + } + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java new file mode 100644 index 0000000..ec52fba --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoder.java @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import java.util.List; + +import org.apache.phoenix.util.ColumnInfo; + +import com.google.common.base.Function; +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Iterables; +import com.google.common.collect.Lists; + +/** + * A codec to transform a {@link ColumnInfo} to a {@link String} and decode back. + */ +public class ColumnInfoToStringEncoderDecoder { + + private static final String COLUMN_INFO_DELIMITER = "|"; + + private ColumnInfoToStringEncoderDecoder() { + + } + + public static String encode(List<ColumnInfo> columnInfos) { + Preconditions.checkNotNull(columnInfos); + return Joiner.on(COLUMN_INFO_DELIMITER) + .skipNulls() + .join(columnInfos); + } + + public static List<ColumnInfo> decode(final String columnInfoStr) { + Preconditions.checkNotNull(columnInfoStr); + List<ColumnInfo> columnInfos = Lists.newArrayList( + Iterables.transform( + Splitter.on(COLUMN_INFO_DELIMITER).omitEmptyStrings().split(columnInfoStr), + new Function<String, ColumnInfo>() { + @Override + public ColumnInfo apply(String colInfo) { + return ColumnInfo.fromString(colInfo); + } + })); + return columnInfos; + + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java new file mode 100644 index 0000000..0864cba --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/ConnectionUtil.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.Properties; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.util.QueryUtil; + +import com.google.common.base.Preconditions; + +/** + * Utility class to return a {@link Connection} . + */ +public class ConnectionUtil { + + /** + * Returns the {@link Connection} from Configuration + * @param configuration + * @return + * @throws SQLException + */ + public static Connection getConnection(final Configuration configuration) throws SQLException { + Preconditions.checkNotNull(configuration); + final Properties props = new Properties(); + final Connection conn = DriverManager.getConnection(QueryUtil.getUrl(configuration.get(HConstants.ZOOKEEPER_QUORUM)), props); + return conn; + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java new file mode 100644 index 0000000..83a606b --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtil.java @@ -0,0 +1,299 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import static org.apache.commons.lang.StringUtils.isNotEmpty; + +import java.sql.Connection; +import java.sql.SQLException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.lib.db.DBInputFormat.NullDBWritable; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PhoenixRuntime; +import org.apache.phoenix.util.QueryUtil; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; +import com.google.common.collect.Lists; + +/** + * A utility class to set properties on the {#link Configuration} instance. + * Used as part of Map Reduce job configuration. + * + */ +public final class PhoenixConfigurationUtil { + + private static final Log LOG = LogFactory.getLog(PhoenixInputFormat.class); + + public static final String UPSERT_COLUMNS = "phoenix.upsert.columns"; + + public static final String UPSERT_STATEMENT = "phoenix.upsert.stmt"; + + public static final String UPSERT_COLUMN_INFO_KEY = "phoenix.upsert.columninfos.list"; + + public static final String SELECT_STATEMENT = "phoenix.select.stmt"; + + public static final String UPSERT_BATCH_SIZE = "phoenix.upsert.batch.size"; + + public static final String SELECT_COLUMNS = "phoneix.select.query.columns"; + + public static final String SELECT_COLUMN_INFO_KEY = "phoenix.select.columninfos.list"; + + public static final String SCHEMA_TYPE = "phoenix.select.schema.type"; + + public static final String COLUMN_NAMES_DELIMITER = "phoenix.column.names.delimiter"; + + public static final String INPUT_TABLE_NAME = "phoenix.input.table.name" ; + + public static final String INPUT_TABLE_CONDITIONS = "phoenix.input.table.conditions" ; + + public static final String OUTPUT_TABLE_NAME = "phoenix.output.table.name" ; + + public static final long DEFAULT_UPSERT_BATCH_SIZE = 1000; + + public static final String DEFAULT_COLUMN_NAMES_DELIMITER = ","; + + public static final String INPUT_CLASS = "phoenix.input.class"; + + public enum SchemaType { + TABLE, + QUERY; + } + + private PhoenixConfigurationUtil(){ + + } + /** + * + * @param tableName + */ + public static void setInputTableName(final Configuration configuration, final String tableName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(tableName); + configuration.set(INPUT_TABLE_NAME, tableName); + } + + public static void setInputTableConditions(final Configuration configuration, final String conditions) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(conditions); + configuration.set(INPUT_TABLE_CONDITIONS, conditions); + } + + public static void setSelectColumnNames(final Configuration configuration,final String[] columns) { + Preconditions.checkNotNull(configuration); + final String selectColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns); + configuration.set(SELECT_COLUMNS, selectColumnNames); + } + + public static void setSelectColumnNames(final Configuration configuration,final String columns) { + Preconditions.checkNotNull(configuration); + configuration.set(SELECT_COLUMNS, columns); + } + + public static void setInputClass(final Configuration configuration, Class<? extends DBWritable> inputClass) { + Preconditions.checkNotNull(configuration); + configuration.setClass(INPUT_CLASS ,inputClass,DBWritable.class); + } + + public static void setInputQuery(final Configuration configuration, final String inputQuery) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(inputQuery); + configuration.set(SELECT_STATEMENT, inputQuery); + } + + public static void setSchemaType(Configuration configuration, final SchemaType schemaType) { + Preconditions.checkNotNull(configuration); + configuration.set(SCHEMA_TYPE, schemaType.name()); + } + + public static void setOutputTableName(final Configuration configuration, final String tableName) { + Preconditions.checkNotNull(configuration); + Preconditions.checkNotNull(tableName); + configuration.set(OUTPUT_TABLE_NAME, tableName); + } + + public static void setUpsertColumnNames(final Configuration configuration,final String[] columns) { + Preconditions.checkNotNull(configuration); + final String upsertColumnNames = Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(columns); + configuration.set(UPSERT_COLUMNS, upsertColumnNames); + } + + public static void setUpsertColumnNames(final Configuration configuration,final String columns) { + Preconditions.checkNotNull(configuration); + configuration.set(UPSERT_COLUMNS, columns); + } + + + public static void setBatchSize(final Configuration configuration, final Long batchSize) { + Preconditions.checkNotNull(configuration); + configuration.setLong(UPSERT_BATCH_SIZE, batchSize); + } + + public static Class<?> getInputClass(final Configuration configuration) { + return configuration.getClass(INPUT_CLASS, NullDBWritable.class); + } + public static SchemaType getSchemaType(final Configuration configuration) { + final String schemaTp = configuration.get(SCHEMA_TYPE); + Preconditions.checkNotNull(schemaTp); + return SchemaType.valueOf(schemaTp); + } + + public static List<ColumnInfo> getUpsertColumnMetadataList(final Configuration configuration) throws SQLException { + Preconditions.checkNotNull(configuration); + final String tableName = getOutputTableName(configuration); + Preconditions.checkNotNull(tableName); + final String columnInfoStr = configuration.get(UPSERT_COLUMN_INFO_KEY); + if(isNotEmpty(columnInfoStr)) { + return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr); + } + final Connection connection = ConnectionUtil.getConnection(configuration); + String upsertColumns = configuration.get(UPSERT_COLUMNS); + List<String> upsertColumnList = null; + if(isNotEmpty(upsertColumns)) { + final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER); + upsertColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(upsertColumns)); + LOG.info(String.format("UseUpsertColumns=%s, upsertColumns=%s, upsertColumnSet.size()=%s, parsedColumns=%s " + ,!upsertColumnList.isEmpty(),upsertColumns, upsertColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(upsertColumnList) + )); + } + List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, upsertColumnList); + final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList); + // we put the encoded column infos in the Configuration for re usability. + configuration.set(UPSERT_COLUMN_INFO_KEY, encodedColumnInfos); + connection.close(); + return columnMetadataList; + } + + public static String getUpsertStatement(final Configuration configuration) throws SQLException { + Preconditions.checkNotNull(configuration); + final String tableName = getOutputTableName(configuration); + Preconditions.checkNotNull(tableName); + String upsertStmt = configuration.get(UPSERT_STATEMENT); + if(isNotEmpty(upsertStmt)) { + return upsertStmt; + } + final boolean useUpsertColumns = isNotEmpty(configuration.get(UPSERT_COLUMNS,"")); + final List<ColumnInfo> columnMetadataList = getUpsertColumnMetadataList(configuration); + if (useUpsertColumns) { + // Generating UPSERT statement without column name information. + upsertStmt = QueryUtil.constructUpsertStatement(tableName, columnMetadataList); + LOG.info("Phoenix Custom Upsert Statement: "+ upsertStmt); + } else { + // Generating UPSERT statement without column name information. + upsertStmt = QueryUtil.constructGenericUpsertStatement(tableName, columnMetadataList.size()); + LOG.info("Phoenix Generic Upsert Statement: " + upsertStmt); + } + configuration.set(UPSERT_STATEMENT, upsertStmt); + return upsertStmt; + + } + + public static List<ColumnInfo> getSelectColumnMetadataList(final Configuration configuration) throws SQLException { + Preconditions.checkNotNull(configuration); + final String columnInfoStr = configuration.get(SELECT_COLUMN_INFO_KEY); + if(isNotEmpty(columnInfoStr)) { + return ColumnInfoToStringEncoderDecoder.decode(columnInfoStr); + } + final String tableName = getInputTableName(configuration); + Preconditions.checkNotNull(tableName); + final Connection connection = ConnectionUtil.getConnection(configuration); + final List<String> selectColumnList = getSelectColumnList(configuration); + final List<ColumnInfo> columnMetadataList = PhoenixRuntime.generateColumnInfo(connection, tableName, selectColumnList); + final String encodedColumnInfos = ColumnInfoToStringEncoderDecoder.encode(columnMetadataList); + // we put the encoded column infos in the Configuration for re usability. + configuration.set(SELECT_COLUMN_INFO_KEY, encodedColumnInfos); + connection.close(); + return columnMetadataList; + } + + private static List<String> getSelectColumnList( + final Configuration configuration) { + String selectColumns = configuration.get(SELECT_COLUMNS); + List<String> selectColumnList = null; + if(isNotEmpty(selectColumns)) { + final String columnNamesDelimiter = configuration.get(COLUMN_NAMES_DELIMITER, DEFAULT_COLUMN_NAMES_DELIMITER); + selectColumnList = Lists.newArrayList(Splitter.on(columnNamesDelimiter).omitEmptyStrings().trimResults().split(selectColumns)); + LOG.info(String.format("UseSelectColumns=%s, selectColumns=%s, selectColumnSet.size()=%s, parsedColumns=%s " + ,!selectColumnList.isEmpty(),selectColumns, selectColumnList.size(), Joiner.on(DEFAULT_COLUMN_NAMES_DELIMITER).join(selectColumnList) + )); + } + return selectColumnList; + } + + public static String getSelectStatement(final Configuration configuration) throws SQLException { + Preconditions.checkNotNull(configuration); + String selectStmt = configuration.get(SELECT_STATEMENT); + if(isNotEmpty(selectStmt)) { + return selectStmt; + } + final String tableName = getInputTableName(configuration); + Preconditions.checkNotNull(tableName); + final List<ColumnInfo> columnMetadataList = getSelectColumnMetadataList(configuration); + final String conditions = configuration.get(INPUT_TABLE_CONDITIONS); + selectStmt = QueryUtil.constructSelectStatement(tableName, columnMetadataList, conditions); + LOG.info("Select Statement: "+ selectStmt); + configuration.set(SELECT_STATEMENT, selectStmt); + return selectStmt; + } + + public static long getBatchSize(final Configuration configuration) throws SQLException { + Preconditions.checkNotNull(configuration); + long batchSize = configuration.getLong(UPSERT_BATCH_SIZE, DEFAULT_UPSERT_BATCH_SIZE); + if(batchSize <= 0) { + Connection conn = ConnectionUtil.getConnection(configuration); + batchSize = ((PhoenixConnection) conn).getMutateBatchSize(); + conn.close(); + } + configuration.setLong(UPSERT_BATCH_SIZE, batchSize); + return batchSize; + } + + public static int getSelectColumnsCount(Configuration configuration, + String tableName) throws SQLException { + Preconditions.checkNotNull(configuration); + final String schemaTp = configuration.get(SCHEMA_TYPE); + final SchemaType schemaType = SchemaType.valueOf(schemaTp); + int count = 0; + if(SchemaType.QUERY.equals(schemaType)) { + List<String> selectedColumnList = getSelectColumnList(configuration); + count = selectedColumnList == null ? 0 : selectedColumnList.size(); + } else { + List<ColumnInfo> columnInfos = getSelectColumnMetadataList(configuration); + count = columnInfos == null ? 0 : columnInfos.size(); + } + return count; + } + + public static String getInputTableName(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(INPUT_TABLE_NAME); + } + + public static String getOutputTableName(Configuration configuration) { + Preconditions.checkNotNull(configuration); + return configuration.get(OUTPUT_TABLE_NAME); + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java new file mode 100644 index 0000000..f1a7f5a --- /dev/null +++ b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/util/PhoenixMapReduceUtil.java @@ -0,0 +1,99 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.db.DBWritable; +import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.PhoenixOutputFormat; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; + +/** + * Utility class for setting Configuration parameters for the Map Reduce job + */ +public final class PhoenixMapReduceUtil { + + private PhoenixMapReduceUtil() { + + } + + /** + * + * @param job + * @param inputClass DBWritable class + * @param tableName Input table name + * @param conditions Condition clause to be added to the WHERE clause. + * @param fieldNames fields being projected for the SELECT query. + */ + public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName , final String conditions, final String... fieldNames) { + job.setInputFormatClass(PhoenixInputFormat.class); + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setInputTableName(configuration, tableName); + PhoenixConfigurationUtil.setSelectColumnNames(configuration,fieldNames); + PhoenixConfigurationUtil.setInputClass(configuration,inputClass); + PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.TABLE); + } + + /** + * + * @param job + * @param inputClass DBWritable class + * @param tableName Input table name + * @param inputQuery Select query. + */ + public static void setInput(final Job job, final Class<? extends DBWritable> inputClass, final String tableName, final String inputQuery) { + job.setInputFormatClass(PhoenixInputFormat.class); + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setInputTableName(configuration, tableName); + PhoenixConfigurationUtil.setInputQuery(configuration, inputQuery); + PhoenixConfigurationUtil.setInputClass(configuration,inputClass); + PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY); + } + + /** + * + * @param job + * @param outputClass + * @param tableName Output table + * @param columns List of columns separated by , + */ + public static void setOutput(final Job job, final String tableName,final String columns) { + job.setOutputFormatClass(PhoenixOutputFormat.class); + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setOutputTableName(configuration, tableName); + PhoenixConfigurationUtil.setUpsertColumnNames(configuration,columns); + } + + + /** + * + * @param job + * @param outputClass + * @param tableName Output table + * @param fieldNames fields + */ + public static void setOutput(final Job job, final String tableName , final String... fieldNames) { + job.setOutputFormatClass(PhoenixOutputFormat.class); + final Configuration configuration = job.getConfiguration(); + PhoenixConfigurationUtil.setOutputTableName(configuration, tableName); + PhoenixConfigurationUtil.setUpsertColumnNames(configuration,fieldNames); + } + + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java index f7abe7e..2dc4029 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/PhoenixRuntime.java @@ -325,8 +325,8 @@ public class PhoenixRuntime { if (columns == null) { // use all columns in the table for(PColumn pColumn : table.getColumns()) { - int sqlType = pColumn.getDataType().getResultSetSqlType(); - columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType)); + int sqlType = pColumn.getDataType().getSqlType(); + columnInfoList.add(new ColumnInfo(pColumn.toString(), sqlType)); } } else { // Leave "null" as indication to skip b/c it doesn't exist @@ -405,7 +405,7 @@ public class PhoenixRuntime { if (pColumn==null) { throw new SQLException("pColumn must not be null."); } - int sqlType = pColumn.getDataType().getResultSetSqlType(); + int sqlType = pColumn.getDataType().getSqlType(); ColumnInfo columnInfo = new ColumnInfo(pColumn.toString(),sqlType); return columnInfo; } http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java index af77001..b91fddc 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/QueryUtil.java @@ -145,29 +145,33 @@ public final class QueryUtil { * * @param fullTableName name of the table for which the select statement needs to be created. * @param columnInfos list of columns to be projected in the select statement. + * @param conditions The condition clause to be added to the WHERE condition * @return Select Query */ - public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos) { + public static String constructSelectStatement(String fullTableName, List<ColumnInfo> columnInfos,final String conditions) { Preconditions.checkNotNull(fullTableName,"Table name cannot be null"); if(columnInfos == null || columnInfos.isEmpty()) { throw new IllegalArgumentException("At least one column must be provided"); } // escape the table name to ensure it is case sensitive. final String escapedFullTableName = SchemaUtil.getEscapedFullTableName(fullTableName); - StringBuilder sb = new StringBuilder(); - sb.append("SELECT "); + StringBuilder query = new StringBuilder(); + query.append("SELECT "); for (ColumnInfo cinfo : columnInfos) { if (cinfo != null) { String fullColumnName = getEscapedFullColumnName(cinfo.getColumnName()); - sb.append(fullColumnName); - sb.append(","); + query.append(fullColumnName); + query.append(","); } } // Remove the trailing comma - sb.setLength(sb.length() - 1); - sb.append(" FROM "); - sb.append(escapedFullTableName); - return sb.toString(); + query.setLength(query.length() - 1); + query.append(" FROM "); + query.append(escapedFullTableName); + if(conditions != null && conditions.length() > 0) { + query.append(" WHERE (").append(conditions).append(")"); + } + return query.toString(); } public static String getUrl(String server) { http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java new file mode 100644 index 0000000..1004981 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/ColumnInfoToStringEncoderDecoderTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import static org.junit.Assert.assertEquals; + +import java.util.List; + +import org.apache.phoenix.mapreduce.util.ColumnInfoToStringEncoderDecoder; +import org.apache.phoenix.schema.PDataType; +import org.apache.phoenix.util.ColumnInfo; +import org.junit.Test; + +import com.google.common.collect.Lists; + +/** + * Tests methods on {@link ColumnInfoToStringEncoderDecoder} + */ +public class ColumnInfoToStringEncoderDecoderTest { + + @Test + public void testEncode() { + final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType()); + final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo)); + assertEquals(columnInfo.toString(),encodedColumnInfo); + } + + @Test + public void testDecode() { + final ColumnInfo columnInfo = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType()); + final String encodedColumnInfo = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo)); + assertEquals(columnInfo.toString(),encodedColumnInfo); + } + + @Test + public void testEncodeDecodeWithNulls() { + final ColumnInfo columnInfo1 = new ColumnInfo("col1", PDataType.VARCHAR.getSqlType()); + final ColumnInfo columnInfo2 = null; + final String columnInfoStr = ColumnInfoToStringEncoderDecoder.encode(Lists.newArrayList(columnInfo1,columnInfo2)); + final List<ColumnInfo> decodedColumnInfo = ColumnInfoToStringEncoderDecoder.decode(columnInfoStr); + assertEquals(1,decodedColumnInfo.size()); + } + +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java new file mode 100644 index 0000000..33c7531 --- /dev/null +++ b/phoenix-core/src/test/java/org/apache/phoenix/mapreduce/util/PhoenixConfigurationUtilTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2010 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + *distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you maynot use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicablelaw or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.phoenix.mapreduce.util; + +import static org.junit.Assert.assertEquals; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hbase.HConstants; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; +import org.apache.phoenix.query.BaseConnectionlessQueryTest; +import org.apache.phoenix.util.ColumnInfo; +import org.apache.phoenix.util.PropertiesUtil; +import org.apache.phoenix.util.SchemaUtil; +import org.apache.phoenix.util.TestUtil; +import org.junit.Test; + +/** + * Test for {@link PhoenixConfigurationUtil} + */ +public class PhoenixConfigurationUtilTest extends BaseConnectionlessQueryTest { + + @Test + public void testUpsertStatement() throws Exception { + Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); + final String tableName = "TEST_TABLE"; + try { + String ddl = "CREATE TABLE "+ tableName + + " (a_string varchar not null, a_binary varbinary not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; + conn.createStatement().execute(ddl); + final Configuration configuration = new Configuration (); + configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl()); + PhoenixConfigurationUtil.setOutputTableName(configuration, tableName); + final String upserStatement = PhoenixConfigurationUtil.getUpsertStatement(configuration); + final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)"; + assertEquals(expectedUpsertStatement, upserStatement); + } finally { + conn.close(); + } + } + + @Test + public void testSelectStatement() throws Exception { + Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); + final String tableName = "TEST_TABLE"; + try { + String ddl = "CREATE TABLE "+ tableName + + " (a_string varchar not null, a_binary varbinary not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; + conn.createStatement().execute(ddl); + final Configuration configuration = new Configuration (); + configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl()); + PhoenixConfigurationUtil.setInputTableName(configuration, tableName); + final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); + final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; + assertEquals(expectedSelectStatement, selectStatement); + } finally { + conn.close(); + } + } + + @Test + public void testSelectStatementForSpecificColumns() throws Exception { + Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); + final String tableName = "TEST_TABLE"; + try { + String ddl = "CREATE TABLE "+ tableName + + " (a_string varchar not null, a_binary varbinary not null, col1 integer" + + " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; + conn.createStatement().execute(ddl); + final Configuration configuration = new Configuration (); + configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl()); + PhoenixConfigurationUtil.setInputTableName(configuration, tableName); + PhoenixConfigurationUtil.setSelectColumnNames(configuration, "A_BINARY"); + final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); + final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; + assertEquals(expectedSelectStatement, selectStatement); + } finally { + conn.close(); + } + } + + @Test + public void testSelectStatementForArrayTypes() throws Exception { + Connection conn = DriverManager.getConnection(getUrl(), PropertiesUtil.deepCopy(TestUtil.TEST_PROPERTIES)); + final String tableName = "TEST_TABLE"; + try { + String ddl = "CREATE TABLE "+ tableName + + " (ID BIGINT NOT NULL PRIMARY KEY, VCARRAY VARCHAR[])\n"; + conn.createStatement().execute(ddl); + final Configuration configuration = new Configuration (); + configuration.set(HConstants.ZOOKEEPER_QUORUM, getUrl()); + PhoenixConfigurationUtil.setSelectColumnNames(configuration,"ID,VCARRAY"); + PhoenixConfigurationUtil.setSchemaType(configuration, SchemaType.QUERY); + PhoenixConfigurationUtil.setInputTableName(configuration, tableName); + final String selectStatement = PhoenixConfigurationUtil.getSelectStatement(configuration); + final String expectedSelectStatement = "SELECT \"ID\",\"0\".\"VCARRAY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; + assertEquals(expectedSelectStatement, selectStatement); + } finally { + conn.close(); + } + } +} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java ---------------------------------------------------------------------- diff --git a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java index 182eb56..33e3b5a 100644 --- a/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java +++ b/phoenix-core/src/test/java/org/apache/phoenix/util/QueryUtilTest.java @@ -64,7 +64,7 @@ public class QueryUtilTest { public void testConstructSelectStatement() { assertEquals( "SELECT \"ID\",\"NAME\" FROM \"MYTAB\"", - QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN))); + QueryUtil.constructSelectStatement("MYTAB", ImmutableList.of(ID_COLUMN,NAME_COLUMN),null)); } /** http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java b/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java deleted file mode 100644 index efbfbf8..0000000 --- a/phoenix-pig/src/it/java/org/apache/phoenix/pig/PhoenixPigConfigurationIT.java +++ /dev/null @@ -1,109 +0,0 @@ -/* - * Copyright 2010 The Apache Software Foundation - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - *distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you maynot use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicablelaw or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.phoenix.pig; - -import static org.apache.phoenix.util.PhoenixRuntime.JDBC_PROTOCOL_TERMINATOR; -import static org.apache.phoenix.util.PhoenixRuntime.PHOENIX_TEST_DRIVER_URL_PARAM; -import static org.apache.phoenix.util.TestUtil.TEST_PROPERTIES; -import static org.junit.Assert.assertEquals; - -import java.sql.Connection; -import java.sql.DriverManager; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.phoenix.end2end.BaseHBaseManagedTimeIT; -import org.apache.phoenix.util.PropertiesUtil; -import org.apache.phoenix.util.SchemaUtil; -import org.apache.phoenix.util.TestUtil; -import org.junit.Test; - - -public class PhoenixPigConfigurationIT extends BaseHBaseManagedTimeIT { - private static final String zkQuorum = TestUtil.LOCALHOST + JDBC_PROTOCOL_TERMINATOR + PHOENIX_TEST_DRIVER_URL_PARAM; - - @Test - public void testUpsertStatement() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - final String tableName = "TEST_TABLE"; - try { - String ddl = "CREATE TABLE "+ tableName + - " (a_string varchar not null, a_binary varbinary not null, col1 integer" + - " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; - createTestTable(getUrl(), ddl); - final PhoenixPigConfiguration configuration = newConfiguration (tableName); - final String upserStatement = configuration.getUpsertStatement(); - final String expectedUpsertStatement = "UPSERT INTO " + tableName + " VALUES (?, ?, ?)"; - assertEquals(expectedUpsertStatement, upserStatement); - } finally { - conn.close(); - } - } - - @Test - public void testSelectStatement() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - final String tableName = "TEST_TABLE"; - try { - String ddl = "CREATE TABLE "+ tableName + - " (a_string varchar not null, a_binary varbinary not null, col1 integer" + - " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; - createTestTable(getUrl(), ddl); - final PhoenixPigConfiguration configuration = newConfiguration (tableName); - final String selectStatement = configuration.getSelectStatement(); - final String expectedSelectStatement = "SELECT \"A_STRING\",\"A_BINARY\",\"0\".\"COL1\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; - assertEquals(expectedSelectStatement, selectStatement); - } finally { - conn.close(); - } - } - - @Test - public void testSelectStatementForSpecificColumns() throws Exception { - Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); - Connection conn = DriverManager.getConnection(getUrl(), props); - conn.setAutoCommit(false); - final String tableName = "TEST_TABLE"; - try { - String ddl = "CREATE TABLE "+ tableName + - " (a_string varchar not null, a_binary varbinary not null, col1 integer" + - " CONSTRAINT pk PRIMARY KEY (a_string, a_binary))\n"; - createTestTable(getUrl(), ddl); - final PhoenixPigConfiguration configuration = newConfiguration (tableName); - configuration.setSelectColumns("A_BINARY"); - final String selectStatement = configuration.getSelectStatement(); - final String expectedSelectStatement = "SELECT \"A_BINARY\" FROM " + SchemaUtil.getEscapedArgument(tableName) ; - assertEquals(expectedSelectStatement, selectStatement); - } finally { - conn.close(); - } - } - - private PhoenixPigConfiguration newConfiguration(String tableName) { - final Configuration configuration = new Configuration(); - final PhoenixPigConfiguration phoenixConfiguration = new PhoenixPigConfiguration(configuration); - phoenixConfiguration.configure(zkQuorum, tableName.toUpperCase(), 100); - return phoenixConfiguration; - } -} http://git-wip-us.apache.org/repos/asf/phoenix/blob/b70f3abd/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java ---------------------------------------------------------------------- diff --git a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java index d8bedf6..1218e82 100644 --- a/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java +++ b/phoenix-pig/src/main/java/org/apache/phoenix/pig/PhoenixHBaseLoader.java @@ -28,18 +28,21 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.mapreduce.TableMapReduceUtil; import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.mapreduce.InputFormat; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.phoenix.pig.PhoenixPigConfiguration.SchemaType; -import org.apache.phoenix.pig.hadoop.PhoenixInputFormat; -import org.apache.phoenix.pig.hadoop.PhoenixRecord; +import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil.SchemaType; import org.apache.phoenix.pig.util.PhoenixPigSchemaUtil; import org.apache.phoenix.pig.util.QuerySchemaParserFunction; import org.apache.phoenix.pig.util.TableSchemaParserFunction; import org.apache.phoenix.pig.util.TypeUtil; +import org.apache.phoenix.pig.writable.PhoenixPigDBWritable; import org.apache.pig.Expression; import org.apache.pig.LoadFunc; import org.apache.pig.LoadMetadata; @@ -83,12 +86,12 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { private static final String PHOENIX_QUERY_SCHEME = "hbase://query/"; private static final String RESOURCE_SCHEMA_SIGNATURE = "phoenix.pig.schema"; - private PhoenixPigConfiguration config; + private Configuration config; private String tableName; private String selectQuery; private String zkQuorum ; - private PhoenixInputFormat inputFormat; - private RecordReader<NullWritable, PhoenixRecord> reader; + private PhoenixInputFormat<PhoenixPigDBWritable> inputFormat; + private RecordReader<NullWritable,PhoenixPigDBWritable> reader; private String contextSignature; private ResourceSchema schema; @@ -107,6 +110,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { final Configuration configuration = job.getConfiguration(); //explicitly turning off combining splits. configuration.setBoolean("pig.noSplitCombination", true); + //to have phoenix working on a secured cluster + TableMapReduceUtil.initCredentials(job); this.initializePhoenixPigConfiguration(location, configuration); } @@ -120,21 +125,22 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { if(this.config != null) { return; } - this.config = new PhoenixPigConfiguration(configuration); - this.config.setServerName(this.zkQuorum); + this.config = configuration; + this.config.set(HConstants.ZOOKEEPER_QUORUM,this.zkQuorum); + PhoenixConfigurationUtil.setInputClass(this.config, PhoenixPigDBWritable.class); Pair<String,String> pair = null; try { if (location.startsWith(PHOENIX_TABLE_NAME_SCHEME)) { String tableSchema = location.substring(PHOENIX_TABLE_NAME_SCHEME.length()); final TableSchemaParserFunction parseFunction = new TableSchemaParserFunction(); pair = parseFunction.apply(tableSchema); - this.config.setSchemaType(SchemaType.TABLE); + PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.TABLE); } else if (location.startsWith(PHOENIX_QUERY_SCHEME)) { this.selectQuery = location.substring(PHOENIX_QUERY_SCHEME.length()); final QuerySchemaParserFunction queryParseFunction = new QuerySchemaParserFunction(this.config); pair = queryParseFunction.apply(this.selectQuery); - config.setSelectStatement(this.selectQuery); - this.config.setSchemaType(SchemaType.QUERY); + PhoenixConfigurationUtil.setInputQuery(this.config, this.selectQuery); + PhoenixConfigurationUtil.setSchemaType(this.config, SchemaType.QUERY); } this.tableName = pair.getFirst(); final String selectedColumns = pair.getSecond(); @@ -142,9 +148,9 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { if(isEmpty(this.tableName) && isEmpty(this.selectQuery)) { printUsage(location); } - this.config.setTableName(this.tableName); + PhoenixConfigurationUtil.setInputTableName(this.config, this.tableName); if(!isEmpty(selectedColumns)) { - this.config.setSelectColumns(selectedColumns); + PhoenixConfigurationUtil.setSelectColumnNames(this.config, selectedColumns); } } catch(IllegalArgumentException iae) { printUsage(location); @@ -160,7 +166,8 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { @Override public InputFormat getInputFormat() throws IOException { if(inputFormat == null) { - inputFormat = new PhoenixInputFormat(); + inputFormat = new PhoenixInputFormat<PhoenixPigDBWritable>(); + PhoenixConfigurationUtil.setInputClass(this.config,PhoenixPigDBWritable.class); } return inputFormat; } @@ -188,13 +195,13 @@ public final class PhoenixHBaseLoader extends LoadFunc implements LoadMetadata { public Tuple getNext() throws IOException { try { if(!reader.nextKeyValue()) { - return null; - } - final PhoenixRecord phoenixRecord = reader.getCurrentValue(); - if(phoenixRecord == null) { + return null; + } + final PhoenixPigDBWritable record = reader.getCurrentValue(); + if(record == null) { return null; } - final Tuple tuple = TypeUtil.transformToTuple(phoenixRecord,schema.getFields()); + final Tuple tuple = TypeUtil.transformToTuple(record,schema.getFields()); return tuple; } catch (InterruptedException e) { int errCode = 6018;
