liyunzhang_intel created PIG-5205:
-------------------------------------
Summary: Duplicate record key info in
GlobalRearrangeConverter#ToGroupKeyValueFunction
Key: PIG-5205
URL: https://issues.apache.org/jira/browse/PIG-5205
Project: Pig
Issue Type: Sub-task
Reporter: liyunzhang_intel
Assignee: liyunzhang_intel
in
org.apache.pig.backend.hadoop.executionengine.spark.converter.GlobalRearrangeConverter.ToGroupKeyValueFunction
{code}
@Override
public Tuple call(Tuple2<IndexedKey, Seq<Seq<Tuple>>> input) {
try {
....
List<Iterator<Tuple>> tupleIterators = new
ArrayList<Iterator<Tuple>>();
for (int j = 0; j < bags.length; j ++) {
Seq<Tuple> bag = bags[j];
Iterator<Tuple> iterator = JavaConversions
.asJavaCollection(bag).iterator();
final int index = i;
tupleIterators.add(new IteratorTransform<Tuple, Tuple>(
iterator) {
@Override
protected Tuple transform(Tuple next) {
try {
Tuple tuple = tf.newTuple(3);
tuple.set(0, index);
# we record duplicate key info here
#for every records, we will use out.set(0,
key) later. may be the key info can be removed
tuple.set(1, key);
tuple.set(2, next);
return tuple;
} catch (ExecException e) {
throw new RuntimeException(e);
}
}
});
++ i;
}
Tuple out = tf.newTuple(2);
out.set(0, key);
out.set(1, new IteratorUnion<Tuple>(tupleIterators.iterator()));
if (LOG.isDebugEnabled()) {
LOG.debug("ToGroupKeyValueFunction out " + out);
}
return out;
} catch (Exception e) {
throw new RuntimeException(e);
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)