PHOENIX-1973 Improve CsvBulkLoadTool performance by moving keyvalue 
construction from map phase to reduce phase(Sergey Soldatov)


Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/e797b36c
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/e797b36c
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/e797b36c

Branch: refs/heads/calcite
Commit: e797b36c2ce42e9b9fd6b37fd8b9f79f79d6f18f
Parents: 60ef7cd
Author: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Authored: Tue Feb 16 12:12:23 2016 +0530
Committer: Rajeshbabu Chintaguntla <rajeshb...@apache.org>
Committed: Tue Feb 16 12:12:23 2016 +0530

----------------------------------------------------------------------
 .../phoenix/mapreduce/AbstractBulkLoadTool.java |   6 +-
 .../mapreduce/FormatToKeyValueMapper.java       | 164 ++++++++++++++++---
 .../mapreduce/FormatToKeyValueReducer.java      | 127 ++++++++++++--
 .../bulkload/TargetTableRefFunctions.java       |  22 ++-
 4 files changed, 281 insertions(+), 38 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/e797b36c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
index 39ee4b1..ab2848f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/AbstractBulkLoadTool.java
@@ -41,6 +41,7 @@ import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hbase.HBaseConfiguration;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.LoadIncrementalHFiles;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
@@ -268,7 +269,7 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
 
         job.setInputFormatClass(TextInputFormat.class);
         job.setMapOutputKeyClass(TableRowkeyPair.class);
-        job.setMapOutputValueClass(KeyValue.class);
+        job.setMapOutputValueClass(ImmutableBytesWritable.class);
         job.setOutputKeyClass(TableRowkeyPair.class);
         job.setOutputValueClass(KeyValue.class);
         job.setReducerClass(FormatToKeyValueReducer.class);
@@ -276,7 +277,10 @@ public abstract class AbstractBulkLoadTool extends 
Configured implements Tool {
         MultiHfileOutputFormat.configureIncrementalLoad(job, tablesToBeLoaded);
 
         final String tableNamesAsJson = 
TargetTableRefFunctions.NAMES_TO_JSON.apply(tablesToBeLoaded);
+        final String logicalNamesAsJson = 
TargetTableRefFunctions.LOGICAN_NAMES_TO_JSON.apply(tablesToBeLoaded);
+
         
job.getConfiguration().set(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY,tableNamesAsJson);
+        
job.getConfiguration().set(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY,logicalNamesAsJson);
 
         // give subclasses their hook
         setupJob(job);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e797b36c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
index 7e115e5..95b099e 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueMapper.java
@@ -17,30 +17,30 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import java.io.DataOutputStream;
 import java.io.IOException;
 import java.sql.SQLException;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 import javax.annotation.Nullable;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.KeyValue;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Mapper;
 import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
 import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
 import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
-import org.apache.phoenix.util.ColumnInfo;
-import org.apache.phoenix.util.PhoenixRuntime;
-import org.apache.phoenix.util.QueryUtil;
-import org.apache.phoenix.util.UpsertExecutor;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,7 +59,7 @@ import com.google.common.collect.Lists;
  * to retrieve {@link ColumnInfo} from the target table.
  */
 public abstract class FormatToKeyValueMapper<RECORD> extends 
Mapper<LongWritable, Text, TableRowkeyPair,
-        KeyValue> {
+        ImmutableBytesWritable> {
 
     protected static final Logger LOG = 
LoggerFactory.getLogger(FormatToKeyValueMapper.class);
 
@@ -79,6 +79,7 @@ public abstract class FormatToKeyValueMapper<RECORD> extends 
Mapper<LongWritable
 
     /** Configuration key for the table names */
     public static final String TABLE_NAMES_CONFKEY = 
"phoenix.mapreduce.import.tablenames";
+    public static final String LOGICAL_NAMES_CONFKEY = 
"phoenix.mapreduce.import.logicalnames";
 
     /** Configuration key for the table configurations */
     public static final String TABLE_CONFIG_CONFKEY = 
"phoenix.mapreduce.import.table.config";
@@ -94,8 +95,14 @@ public abstract class FormatToKeyValueMapper<RECORD> extends 
Mapper<LongWritable
     protected UpsertExecutor<RECORD, ?> upsertExecutor;
     protected ImportPreUpsertKeyValueProcessor preUpdateProcessor;
     protected List<String> tableNames;
+    protected List<String> logicalNames;
     protected MapperUpsertListener<RECORD> upsertListener;
 
+    /*
+    lookup table for column index. Index in the List matches to the index in 
tableNames List
+     */
+    protected List<Map<byte[], Map<byte[], Integer>>> columnIndexes;
+
     protected abstract UpsertExecutor<RECORD,?> 
buildUpsertExecutor(Configuration conf);
     protected abstract LineParser<RECORD> getLineParser();
 
@@ -112,13 +119,17 @@ public abstract class FormatToKeyValueMapper<RECORD> 
extends Mapper<LongWritable
 
         try {
             conn = (PhoenixConnection) QueryUtil.getConnection(clientInfos, 
conf);
+
+            final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
+            final String logicalNamesConf = conf.get(LOGICAL_NAMES_CONFKEY);
+            tableNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+            logicalNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
+
+            columnIndexes = initColumnIndexes();
         } catch (SQLException | ClassNotFoundException e) {
             throw new RuntimeException(e);
         }
 
-        final String tableNamesConf = conf.get(TABLE_NAMES_CONFKEY);
-        tableNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
-
         upsertListener = new MapperUpsertListener<RECORD>(
                 context, conf.getBoolean(IGNORE_INVALID_ROW_CONFKEY, true));
         upsertExecutor = buildUpsertExecutor(conf);
@@ -127,7 +138,8 @@ public abstract class FormatToKeyValueMapper<RECORD> 
extends Mapper<LongWritable
 
     @SuppressWarnings("deprecation")
     @Override
-    protected void map(LongWritable key, Text value, Context context) throws 
IOException, InterruptedException {
+    protected void map(LongWritable key, Text value, Context context) throws 
IOException,
+            InterruptedException {
         if (conn == null) {
             throw new RuntimeException("Connection not initialized.");
         }
@@ -145,7 +157,7 @@ public abstract class FormatToKeyValueMapper<RECORD> 
extends Mapper<LongWritable
                 return;
             }
             upsertExecutor.execute(ImmutableList.<RECORD>of(record));
-
+            Map<Integer, List<KeyValue>> map = new HashMap<>();
             Iterator<Pair<byte[], List<KeyValue>>> uncommittedDataIterator
                     = PhoenixRuntime.getUncommittedDataIterator(conn, true);
             while (uncommittedDataIterator.hasNext()) {
@@ -153,24 +165,125 @@ public abstract class FormatToKeyValueMapper<RECORD> 
extends Mapper<LongWritable
                 List<KeyValue> keyValueList = kvPair.getSecond();
                 keyValueList = preUpdateProcessor.preUpsert(kvPair.getFirst(), 
keyValueList);
                 byte[] first = kvPair.getFirst();
-                for (String tableName : tableNames) {
-                    if (Bytes.compareTo(Bytes.toBytes(tableName), first) != 0) 
{
-                        // skip edits for other tables
-                        continue;
-                    }
-                    for (KeyValue kv : keyValueList) {
-                        ImmutableBytesWritable outputKey = new 
ImmutableBytesWritable();
-                        outputKey.set(kv.getRowArray(), kv.getRowOffset(), 
kv.getRowLength());
-                        context.write(new TableRowkeyPair(tableName, 
outputKey), kv);
+                // Create a list of KV for each table
+                for (int i = 0; i < tableNames.size(); i++) {
+                    if (Bytes.compareTo(Bytes.toBytes(tableNames.get(i)), 
first) == 0) {
+                        if (!map.containsKey(i)) {
+                            map.put(i, new ArrayList<KeyValue>());
+                        }
+                        List<KeyValue> list = map.get(i);
+                        for (KeyValue kv : keyValueList) {
+                            list.add(kv);
+                        }
+                        break;
                     }
                 }
             }
+            for(Map.Entry<Integer, List<KeyValue>> rowEntry : map.entrySet()) {
+                int tableIndex = rowEntry.getKey();
+                List<KeyValue> lkv = rowEntry.getValue();
+                // All KV values combines to a single byte array
+                writeAggregatedRow(context, tableIndex, lkv);
+            }
             conn.rollback();
         } catch (Exception e) {
             throw new RuntimeException(e);
         }
     }
 
+    private List<Map<byte[],Map<byte[], Integer>>> initColumnIndexes() throws 
SQLException {
+        List<Map<byte[], Map<byte[], Integer>>> tableMap = new ArrayList<>();
+        int tableIndex;
+        for (tableIndex = 0; tableIndex < tableNames.size(); tableIndex++) {
+            PTable table = PhoenixRuntime.getTable(conn, 
logicalNames.get(tableIndex));
+            Map<byte[], Map<byte[], Integer>> columnMap = new 
TreeMap<>(Bytes.BYTES_COMPARATOR);
+            List<PColumn> cls = table.getColumns();
+            for(int i = 0; i < cls.size(); i++) {
+                PColumn c = cls.get(i);
+                if(c.getFamilyName() == null) continue; // Skip PK column
+                byte[] family = c.getFamilyName().getBytes();
+                byte[] name = c.getName().getBytes();
+                if(!columnMap.containsKey(family)) {
+                    columnMap.put(family, new TreeMap<byte[], 
Integer>(Bytes.BYTES_COMPARATOR));
+                }
+                Map<byte[], Integer> qualifier = columnMap.get(family);
+                qualifier.put(name, i);
+            }
+            tableMap.add(columnMap);
+        }
+        return tableMap;
+    }
+
+    /**
+     * Find the column index which will replace the column name in
+     * the aggregated array and will be restored in Reducer
+     *
+     * @param tableIndex Table index in tableNames list
+     * @param cell KeyValue for the column
+     * @return column index for the specified cell or -1 if was not found
+     */
+    private int findIndex(int tableIndex, Cell cell) {
+        Map<byte[], Map<byte[], Integer>> columnMap = 
columnIndexes.get(tableIndex);
+        Map<byte[], Integer> qualifiers = 
columnMap.get(Bytes.copy(cell.getFamilyArray(),
+                cell.getFamilyOffset(), cell.getFamilyLength()));
+        if(qualifiers!= null) {
+            Integer result = 
qualifiers.get(Bytes.copy(cell.getQualifierArray(),
+                    cell.getQualifierOffset(), cell.getQualifierLength()));
+            if(result!=null) {
+                return result;
+            }
+        }
+        return -1;
+    }
+
+    /**
+     * Collect all column values for the same rowKey
+     *
+     * @param context Current mapper context
+     * @param tableIndex Table index in tableNames list
+     * @param lkv List of KV values that will be combined in a single 
ImmutableBytesWritable
+     * @throws IOException
+     * @throws InterruptedException
+     */
+
+    private void writeAggregatedRow(Context context, int tableIndex, 
List<KeyValue> lkv)
+            throws IOException, InterruptedException {
+        TrustedByteArrayOutputStream bos = new 
TrustedByteArrayOutputStream(1024);
+        DataOutputStream outputStream = new DataOutputStream(bos);
+        ImmutableBytesWritable outputKey = new ImmutableBytesWritable();
+        if (!lkv.isEmpty()) {
+            // All Key Values for the same row are supposed to be the same, so 
init rowKey only once
+            Cell first = lkv.get(0);
+            outputKey.set(first.getRowArray(), first.getRowOffset(), 
first.getRowLength());
+            for (KeyValue cell : lkv) {
+                if(isEmptyCell(cell)) {
+                    continue;
+                }
+                int i = findIndex(tableIndex, cell);
+                if (i == -1) {
+                    throw new IOException("No column found for KeyValue");
+                }
+                WritableUtils.writeVInt(outputStream, i);
+                outputStream.writeByte(cell.getTypeByte());
+                WritableUtils.writeVInt(outputStream, cell.getValueLength());
+                outputStream.write(cell.getValueArray(), 
cell.getValueOffset(), cell.getValueLength());
+            }
+        }
+        ImmutableBytesWritable aggregatedArray = new 
ImmutableBytesWritable(bos.toByteArray());
+        outputStream.close();
+        context.write(new TableRowkeyPair(Integer.toString(tableIndex), 
outputKey), aggregatedArray);
+    }
+
+    protected boolean isEmptyCell(KeyValue cell) {
+        if (Bytes.compareTo(cell.getQualifierArray(), 
cell.getQualifierOffset(),
+                cell.getQualifierLength(), QueryConstants.EMPTY_COLUMN_BYTES, 
0,
+                QueryConstants.EMPTY_COLUMN_BYTES.length) != 0)
+            return false;
+        else
+            return true;
+    }
+
+
     @Override
     protected void cleanup(Context context) throws IOException, 
InterruptedException {
         try {
@@ -223,11 +336,12 @@ public abstract class FormatToKeyValueMapper<RECORD> 
extends Mapper<LongWritable
     @VisibleForTesting
     static class MapperUpsertListener<T> implements 
UpsertExecutor.UpsertListener<T> {
 
-        private final Mapper<LongWritable, Text, TableRowkeyPair, 
KeyValue>.Context context;
+        private final Mapper<LongWritable, Text,
+                TableRowkeyPair, ImmutableBytesWritable>.Context context;
         private final boolean ignoreRecordErrors;
 
         private MapperUpsertListener(
-                Mapper<LongWritable, Text, TableRowkeyPair, KeyValue>.Context 
context,
+                Mapper<LongWritable, Text, TableRowkeyPair, 
ImmutableBytesWritable>.Context context,
                 boolean ignoreRecordErrors) {
             this.context = context;
             this.ignoreRecordErrors = ignoreRecordErrors;

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e797b36c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
index 5d00656..0f90e45 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/FormatToKeyValueReducer.java
@@ -17,36 +17,143 @@
  */
 package org.apache.phoenix.mapreduce;
 
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
 import java.io.IOException;
-import java.util.TreeSet;
+import java.sql.SQLException;
+import java.util.*;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.mapreduce.KeyValueSortReducer;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.mapreduce.Reducer;
+import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
+import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
+import org.apache.phoenix.jdbc.PhoenixConnection;
 import org.apache.phoenix.mapreduce.bulkload.TableRowkeyPair;
+import org.apache.phoenix.mapreduce.bulkload.TargetTableRefFunctions;
+import org.apache.phoenix.query.QueryConstants;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.util.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Reducer class for the bulkload jobs.
  * Performs similar functionality to {@link KeyValueSortReducer}
  */
 public class FormatToKeyValueReducer
-    extends Reducer<TableRowkeyPair,KeyValue,TableRowkeyPair,KeyValue> {
+    extends 
Reducer<TableRowkeyPair,ImmutableBytesWritable,TableRowkeyPair,KeyValue> {
+
+    protected static final Logger LOG = 
LoggerFactory.getLogger(FormatToKeyValueReducer.class);
+
+
+    protected List<String> tableNames;
+    protected List<String> logicalNames;
+    protected KeyValueBuilder builder;
+    List<List<Pair<byte[], byte[]>>> columnIndexes;
+    List<ImmutableBytesPtr> emptyFamilyName;
+
+
+    @Override
+    protected void setup(Context context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+
+        // pass client configuration into driver
+        Properties clientInfos = new Properties();
+        for (Map.Entry<String, String> entry : conf) {
+            clientInfos.setProperty(entry.getKey(), entry.getValue());
+        }
+
+        try {
+            PhoenixConnection conn = (PhoenixConnection) 
QueryUtil.getConnection(clientInfos, conf);
+            builder = conn.getKeyValueBuilder();
+            final String tableNamesConf = 
conf.get(FormatToKeyValueMapper.TABLE_NAMES_CONFKEY);
+            final String logicalNamesConf = 
conf.get(FormatToKeyValueMapper.LOGICAL_NAMES_CONFKEY);
+            tableNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(tableNamesConf);
+            logicalNames = 
TargetTableRefFunctions.NAMES_FROM_JSON.apply(logicalNamesConf);
+
+            columnIndexes = new ArrayList<>(tableNames.size());
+            emptyFamilyName = new ArrayList<>();
+            initColumnsMap(conn);
+        } catch (SQLException | ClassNotFoundException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private void initColumnsMap(PhoenixConnection conn) throws SQLException {
+        for (String tableName : logicalNames) {
+            PTable table = PhoenixRuntime.getTable(conn, tableName);
+            emptyFamilyName.add(SchemaUtil.getEmptyColumnFamilyPtr(table));
+            List<PColumn> cls = table.getColumns();
+            List<Pair<byte[], byte[]>> list = new ArrayList(cls.size());
+            for(int i = 0; i < cls.size(); i++) {
+                PColumn c = cls.get(i);
+                if(c.getFamilyName() == null) {
+                    list.add(null); // Skip PK column
+                    continue;
+                }
+                byte[] family = c.getFamilyName().getBytes();
+                byte[] name = c.getName().getBytes();
+                list.add(new Pair(family, name));
+            }
+            columnIndexes.add(list);
+        }
+
+    }
 
     @Override
-    protected void reduce(TableRowkeyPair key, Iterable<KeyValue> values,
-        Reducer<TableRowkeyPair, KeyValue, TableRowkeyPair, KeyValue>.Context 
context)
+    protected void reduce(TableRowkeyPair key, 
Iterable<ImmutableBytesWritable> values,
+        Reducer<TableRowkeyPair, ImmutableBytesWritable, TableRowkeyPair, 
KeyValue>.Context context)
         throws IOException, InterruptedException {
         TreeSet<KeyValue> map = new TreeSet<KeyValue>(KeyValue.COMPARATOR);
-        for (KeyValue kv: values) {
-            try {
-                map.add(kv.clone());
-            } catch (CloneNotSupportedException e) {
-                throw new java.io.IOException(e);
+        int tableIndex = Integer.parseInt(key.getTableName());
+        key.setTableName(tableNames.get(tableIndex));
+        List<Pair<byte[], byte[]>> columns = columnIndexes.get(tableIndex);
+        for (ImmutableBytesWritable aggregatedArray : values) {
+            DataInputStream input = new DataInputStream(new 
ByteArrayInputStream(aggregatedArray.get()));
+            while (input.available()!= 0) {
+                int index = WritableUtils.readVInt(input);
+                Pair<byte[], byte[]> pair = columns.get(index);
+                byte type = input.readByte();
+                ImmutableBytesWritable value = null;
+                int len = WritableUtils.readVInt(input);
+                if (len > 0) {
+                    byte[] array = new byte[len];
+                    input.read(array);
+                    value = new ImmutableBytesWritable(array);
+                }
+                KeyValue kv;
+                KeyValue.Type kvType = KeyValue.Type.codeToType(type);
+                switch (kvType) {
+                    case Put: // not null value
+                        kv = builder.buildPut(key.getRowkey(),
+                                new ImmutableBytesWritable(pair.getFirst()),
+                                new ImmutableBytesWritable(pair.getSecond()), 
value);
+                        break;
+                    case DeleteColumn: // null value
+                        kv = builder.buildDeleteColumns(key.getRowkey(),
+                                new ImmutableBytesWritable(pair.getFirst()),
+                                new ImmutableBytesWritable(pair.getSecond()));
+                        break;
+                    default:
+                        throw new IOException("Unsupported KeyValue type " + 
kvType);
+                }
+                map.add(kv);
             }
+            KeyValue empty = builder.buildPut(key.getRowkey(),
+                    emptyFamilyName.get(tableIndex),
+                    QueryConstants.EMPTY_COLUMN_BYTES_PTR, 
ByteUtil.EMPTY_BYTE_ARRAY_PTR);
+            map.add(empty);
+            Closeables.closeQuietly(input);
         }
         context.setStatus("Read " + map.getClass());
         int index = 0;
-        for (KeyValue kv: map) {
+        for (KeyValue kv : map) {
             context.write(key, kv);
             if (++index % 100 == 0) context.setStatus("Wrote " + index);
         }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/e797b36c/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
index d786842..e02065f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/mapreduce/bulkload/TargetTableRefFunctions.java
@@ -78,7 +78,25 @@ public class TargetTableRefFunctions {
          }
      };
 
-     public static Function<String,List<String>> NAMES_FROM_JSON =  new 
Function<String,List<String>>() {
+    public static Function<List<TargetTableRef>,String> LOGICAN_NAMES_TO_JSON 
=  new Function<List<TargetTableRef>,String>() {
+
+        @Override
+        public String apply(List<TargetTableRef> input) {
+            try {
+                List<String> tableNames = 
Lists.newArrayListWithCapacity(input.size());
+                for(TargetTableRef table : input) {
+                    tableNames.add(table.getLogicalName());
+                }
+                ObjectMapper mapper = new ObjectMapper();
+                return mapper.writeValueAsString(tableNames);
+            } catch (IOException e) {
+                throw new RuntimeException(e);
+            }
+
+        }
+    };
+
+    public static Function<String,List<String>> NAMES_FROM_JSON =  new 
Function<String,List<String>>() {
 
          @SuppressWarnings("unchecked")
          @Override
@@ -92,4 +110,4 @@ public class TargetTableRefFunctions {
 
          }
      };
- }
+}

Reply via email to