Author: rohini
Date: Mon Apr 11 23:00:05 2016
New Revision: 1738662
URL: http://svn.apache.org/viewvc?rev=1738662&view=rev
Log:
PIG-4853: Fetch inputs before starting outputs
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon Apr 11 23:00:05 2016
@@ -24,6 +24,8 @@ INCOMPATIBLE CHANGES
IMPROVEMENTS
+PIG-4853: Fetch inputs before starting outputs (rohini)
+
PIG-4847: POPartialAgg processing and spill improvements (rohini)
PIG-4840: Do not turn off UnionOptimizer for unsupported storefuncs in case of
no vertex groups (rohini)
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POCounterStatsTez.java
Mon Apr 11 23:00:05 2016
@@ -56,6 +56,7 @@ public class POCounterStatsTez extends P
private transient KeyValuesReader reader;
private transient KeyValueWriter writer;
private transient boolean finished = false;
+ private transient boolean hasNext = false;
public POCounterStatsTez(OperatorKey k) {
super(k);
@@ -88,6 +89,7 @@ public class POCounterStatsTez extends P
try {
reader = (KeyValuesReader) input.getReader();
LOG.info("Attached input from vertex " + inputKey + " : input=" +
input + ", reader=" + reader);
+ hasNext = reader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -130,12 +132,13 @@ public class POCounterStatsTez extends P
Integer key = null;
Long value = null;
// Read count of records per task
- while (reader.next()) {
+ while (hasNext) {
key = ((IntWritable)reader.getCurrentKey()).get();
for (Object val : reader.getCurrentValues()) {
value = ((LongWritable)val).get();
counterRecords.put(key, value);
}
+ hasNext = reader.next();
}
// BinInterSedes only takes String for map key
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POFRJoinTez.java
Mon Apr 11 23:00:05 2016
@@ -101,9 +101,13 @@ public class POFRJoinTez extends POFRJoi
LogicalInput input = inputs.get(key);
if (!this.replInputs.contains(input)) {
this.replInputs.add(input);
- this.replReaders.add((KeyValueReader) input.getReader());
+ KeyValueReader reader = (KeyValueReader) input.getReader();
+ this.replReaders.add(reader);
+ log.info("Attached input from vertex " + key + " : input="
+ input + ", reader=" + reader);
}
}
+ // Do not force fetch input by reading first record. Cases like
MultiQuery_Union_4 have
+ // multiple POFRJoinTez loading same replicate input and will skip
records
} catch (Exception e) {
throw new ExecException(e);
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POIdentityInOutTez.java
Mon Apr 11 23:00:05 2016
@@ -57,6 +57,7 @@ public class POIdentityInOutTez extends
private transient KeyValuesReader shuffleReader;
private transient boolean shuffleInput;
private transient boolean finished = false;
+ private transient boolean hasNext = false;
public POIdentityInOutTez(OperatorKey k, POLocalRearrange inputRearrange,
String inputKey) {
super(inputRearrange);
@@ -95,9 +96,12 @@ public class POIdentityInOutTez extends
Reader r = input.getReader();
if (r instanceof KeyValueReader) {
reader = (KeyValueReader) r;
+ // Force input fetch
+ hasNext = reader.next();
} else {
shuffleInput = true;
shuffleReader = (KeyValuesReader) r;
+ hasNext = shuffleReader.next();
}
LOG.info("Attached input from vertex " + inputKey + " : input=" +
input + ", reader=" + r);
} catch (Exception e) {
@@ -127,7 +131,7 @@ public class POIdentityInOutTez extends
return RESULT_EOP;
}
if (shuffleInput) {
- while (shuffleReader.next()) {
+ while (hasNext) {
Object curKey = shuffleReader.getCurrentKey();
Iterable<Object> vals = shuffleReader.getCurrentValues();
if (isSkewedJoin) {
@@ -139,9 +143,10 @@ public class POIdentityInOutTez extends
for (Object val : vals) {
writer.write(curKey, val);
}
+ hasNext = shuffleReader.next();
}
} else {
- while (reader.next()) {
+ while (hasNext) {
if (isSkewedJoin) {
NullablePartitionWritable wrappedKey = new
NullablePartitionWritable(
(PigNullableWritable) reader.getCurrentKey());
@@ -155,6 +160,7 @@ public class POIdentityInOutTez extends
writer.write(reader.getCurrentKey(),
reader.getCurrentValue());
}
+ hasNext = reader.next();
}
}
finished = true;
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/PORankTez.java
Mon Apr 11 23:00:05 2016
@@ -51,6 +51,7 @@ public class PORankTez extends PORank im
private transient Map<Integer, Long> counterOffsets;
private transient Configuration conf;
private transient boolean finished = false;
+ private transient Boolean hasFirstRecord;
public PORankTez(PORank copy) {
super(copy);
@@ -100,6 +101,7 @@ public class PORankTez extends PORank im
try {
reader = (KeyValueReader) input.getReader();
LOG.info("Attached input from vertex " + tuplesInputKey + " :
input=" + input + ", reader=" + reader);
+ hasFirstRecord = reader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -140,9 +142,18 @@ public class PORankTez extends PORank im
Result inp = null;
try {
- while (reader.next()) {
- inp = new Result(POStatus.STATUS_OK, reader.getCurrentValue());
- return addRank(inp);
+ if (hasFirstRecord != null) {
+ if (hasFirstRecord) {
+ hasFirstRecord = null;
+ inp = new Result(POStatus.STATUS_OK,
reader.getCurrentValue());
+ return addRank(inp);
+ }
+ hasFirstRecord = null;
+ } else {
+ while (reader.next()) {
+ inp = new Result(POStatus.STATUS_OK,
reader.getCurrentValue());
+ return addRank(inp);
+ }
}
} catch (IOException e) {
throw new ExecException(e);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffleTezLoad.java
Mon Apr 11 23:00:05 2016
@@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.WritableComparator;
import org.apache.pig.backend.executionengine.ExecException;
@@ -48,6 +50,7 @@ import org.apache.tez.runtime.library.co
public class POShuffleTezLoad extends POPackage implements TezInput {
private static final long serialVersionUID = 1L;
+ private static final Log LOG = LogFactory.getLog(POShuffleTezLoad.class);
protected List<String> inputKeys = new ArrayList<String>();
private boolean isSkewedJoin = false;
@@ -101,7 +104,10 @@ public class POShuffleTezLoad extends PO
// - Input key will be repeated, but index would be same
within a TezInput
if (!this.inputs.contains(input)) {
this.inputs.add(input);
- this.readers.add((KeyValuesReader)input.getReader());
+ KeyValuesReader reader =
(KeyValuesReader)input.getReader();
+ this.readers.add(reader);
+ LOG.info("Attached input from vertex " + inputKey
+ + " : input=" + input + ", reader=" + reader);
}
}
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POShuffledValueInputTez.java
Mon Apr 11 23:00:05 2016
@@ -57,6 +57,7 @@ public class POShuffledValueInputTez ext
private transient Iterator<KeyValueReader> readers;
private transient KeyValueReader currentReader;
private transient Configuration conf;
+ private transient Boolean hasFirstRecord;
public POShuffledValueInputTez(OperatorKey k) {
super(k);
@@ -98,6 +99,8 @@ public class POShuffledValueInputTez ext
}
readers = readersList.iterator();
currentReader = readers.next();
+ // Force input fetch
+ hasFirstRecord = currentReader.next();
} catch (Exception e) {
throw new ExecException(e);
}
@@ -111,7 +114,15 @@ public class POShuffledValueInputTez ext
}
do {
- if (currentReader.next()) {
+ if (hasFirstRecord != null) {
+ if (hasFirstRecord) {
+ hasFirstRecord = null;
+ Tuple origTuple = (Tuple)
currentReader.getCurrentValue();
+ Tuple copy =
mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
+ }
+ hasFirstRecord = null;
+ } else if (currentReader.next()) {
Tuple origTuple = (Tuple) currentReader.getCurrentValue();
Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
return new Result(POStatus.STATUS_OK, copy);
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/plan/operator/POValueInputTez.java
Mon Apr 11 23:00:05 2016
@@ -57,6 +57,7 @@ public class POValueInputTez extends Phy
private transient KeyValuesReader shuffleReader;
private transient boolean shuffleInput;
private transient boolean hasNext;
+ private transient Boolean hasFirstRecord;
public POValueInputTez(OperatorKey k) {
super(k);
@@ -92,6 +93,8 @@ public class POValueInputTez extends Phy
Reader r = input.getReader();
if (r instanceof KeyValueReader) {
reader = (KeyValueReader) r;
+ // Force input fetch
+ hasFirstRecord = reader.next();
} else {
shuffleInput = true;
shuffleReader = (KeyValuesReader) r;
@@ -118,10 +121,22 @@ public class POValueInputTez extends Phy
}
hasNext = shuffleReader.next();
}
- } else if (reader.next()) {
- Tuple origTuple = (Tuple) reader.getCurrentValue();
- Tuple copy = mTupleFactory.newTuple(origTuple.getAll());
- return new Result(POStatus.STATUS_OK, copy);
+ } else {
+ if (hasFirstRecord != null) {
+ if (hasFirstRecord) {
+ hasFirstRecord = null;
+ Tuple origTuple = (Tuple) reader.getCurrentValue();
+ Tuple copy =
mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
+ }
+ hasFirstRecord = null;
+ } else {
+ while (reader.next()) {
+ Tuple origTuple = (Tuple) reader.getCurrentValue();
+ Tuple copy =
mTupleFactory.newTuple(origTuple.getAll());
+ return new Result(POStatus.STATUS_OK, copy);
+ }
+ }
}
finished = true;
// For certain operators (such as STREAM), we could still have
some work
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java?rev=1738662&r1=1738661&r2=1738662&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/runtime/TezInput.java
Mon Apr 11 23:00:05 2016
@@ -43,6 +43,15 @@ public interface TezInput {
*/
public void addInputsToSkip(Set<String> inputsToSkip);
+ /**
+ * Attach the inputs to the operator. Also ensure reader.next() is called
to force fetch
+ * the input so that all inputs are fetched and memory released before
memory is allocated
+ * for outputs
+ *
+ * @param inputs available inputs
+ * @param conf configuration
+ * @throws ExecException
+ */
public void attachInputs(Map<String, LogicalInput> inputs,
Configuration conf) throws ExecException;