Author: szita
Date: Mon May 22 19:46:08 2017
New Revision: 1795844
URL: http://svn.apache.org/viewvc?rev=1795844&view=rev
Log:
PIG-4748: DateTimeWritable forgets Chronology (szita)
Added:
pig/trunk/test/org/apache/pig/test/TestDateTime.java
Removed:
pig/trunk/test/org/apache/pig/test/TestDefaultDateTimeZone.java
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
Modified: pig/trunk/CHANGES.txt
URL:
http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Mon May 22 19:46:08 2017
@@ -97,6 +97,8 @@ OPTIMIZATIONS
Â
BUG FIXES
+PIG-4748: DateTimeWritable forgets Chronology (szita)
+
PIG-5229: TestPigTest.testSpecificOrderOutput and
testSpecificOrderOutputForAlias failing (knoguchi)
PIG-5226: PreprocessorContext.java can deadlock forever with large stderr
(jtolar via knoguchi)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/DateTimeWritable.java Mon May
22 19:46:08 2017
@@ -21,20 +21,27 @@ package org.apache.pig.backend.hadoop;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Properties;
+import java.util.TreeSet;
import org.joda.time.DateTime;
import org.joda.time.DateTimeZone;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.pig.impl.PigImplConstants;
+import org.apache.pig.impl.util.UDFContext;
/**
- * Writable for Double values.
+ * Writable for DateTime values.
*/
public class DateTimeWritable implements WritableComparable {
-
- private static final int ONE_MINUTE = 60000;
+ private static List<String> availableZoneIDs = null;
private DateTime value = null;
public DateTimeWritable() {
@@ -46,12 +53,32 @@ public class DateTimeWritable implements
}
public void readFields(DataInput in) throws IOException {
- value = new DateTime(in.readLong(),
DateTimeZone.forOffsetMillis(in.readShort() * ONE_MINUTE));
+ retrieveAvailableZoneList();
+
+ long instant = in.readLong();
+ int offsetInMillis = in.readInt();
+ int zoneListPos = in.readInt();
+
+ DateTimeZone timeZone = null;
+ if (zoneListPos != -1){
+ timeZone = DateTimeZone.forID(availableZoneIDs.get(zoneListPos));
+ } else {
+ timeZone = DateTimeZone.forOffsetMillis(offsetInMillis);
+ }
+
+ value = new DateTime(instant, timeZone);
}
public void write(DataOutput out) throws IOException {
+ retrieveAvailableZoneList();
+
+ String zoneId = value.getZone().getID();
+ int offsetInMillis = value.getZone().getOffset(0L);
+ int zoneListPos = availableZoneIDs.indexOf(zoneId);
+
out.writeLong(value.getMillis());
- out.writeShort(value.getZone().getOffset(value) / ONE_MINUTE);
+ out.writeInt(offsetInMillis);
+ out.writeInt(zoneListPos);
}
public void set(DateTime dt) {
@@ -62,6 +89,26 @@ public class DateTimeWritable implements
return value;
}
+ private void retrieveAvailableZoneList() throws IOException {
+ if (availableZoneIDs != null){
+ return;
+ }
+ Properties props =
UDFContext.getUDFContext().getUDFProperties(PigImplConstants.PIG_DATETIME_ZONES_LIST.getClass());
+ Collection<String> zoneList =
StringUtils.getStringCollection(props.getProperty(PigImplConstants.PIG_DATETIME_ZONES_LIST));
+ if (zoneList == null || zoneList.size() == 0){
+ throw new IOException("Datetime zone information not set");
+ }
+ availableZoneIDs = new ArrayList<>(zoneList);
+ }
+
+ public static void setupAvailableZoneIds() {
+ TreeSet<String> sortedZoneIDs = new
TreeSet<>(DateTimeZone.getAvailableIDs());
+ Properties props = UDFContext.getUDFContext().getUDFProperties(
+ PigImplConstants.PIG_DATETIME_ZONES_LIST.getClass());
+ props.setProperty(PigImplConstants.PIG_DATETIME_ZONES_LIST,
StringUtils.arrayToString(
+ sortedZoneIDs.toArray(new String[0])));
+ }
+
/**
* Returns true iff <code>o</code> is a DateTimeWritable with the same
* value.
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/JobControlCompiler.java
Mon May 22 19:46:08 2017
@@ -70,6 +70,7 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.DateTimeWritable;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.PigJobControl;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
@@ -533,6 +534,9 @@ public class JobControlCompiler{
conf.get(MRConfiguration.JOB_REDUCE_MARKRESET_BUFFER_PERCENT));
}
+
+ DateTimeWritable.setupAvailableZoneIds();
+
configureCompression(conf);
try{
Modified:
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
---
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
(original)
+++
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/tez/TezDagBuilder.java
Mon May 22 19:46:08 2017
@@ -55,6 +55,7 @@ import org.apache.pig.PigConfiguration;
import org.apache.pig.PigException;
import org.apache.pig.StoreFuncInterface;
import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.DateTimeWritable;
import org.apache.pig.backend.hadoop.HDataType;
import org.apache.pig.backend.hadoop.datastorage.ConfigurationUtil;
import org.apache.pig.backend.hadoop.executionengine.JobCreationException;
@@ -636,6 +637,8 @@ public class TezDagBuilder extends TezOp
payloadConf.setBoolean(PigImplConstants.PIG_EXECTYPE_MODE_LOCAL,
pc.getExecType().isLocal());
payloadConf.set(PigImplConstants.PIG_LOG4J_PROPERTIES,
ObjectSerializer.serialize(pc.getLog4jProperties()));
+ DateTimeWritable.setupAvailableZoneIds();
+
// Process stores
LinkedList<POStore> stores = processStores(tezOp, payloadConf, job);
Modified: pig/trunk/src/org/apache/pig/impl/PigImplConstants.java
URL:
http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigImplConstants.java?rev=1795844&r1=1795843&r2=1795844&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigImplConstants.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigImplConstants.java Mon May 22 19:46:08
2017
@@ -85,6 +85,11 @@ public class PigImplConstants {
*/
public static final String PIG_AUDIT_ID = "pig.script.id";
+ /**
+ * Used to carry zone ID list from frontend to backend (generated by
frontend during Job creation)
+ */
+ public static final String PIG_DATETIME_ZONES_LIST =
"pig.datetime.zones.list";
+
// Kill the jobs before cleaning up tmp files
public static int SHUTDOWN_HOOK_JOB_KILL_PRIORITY = 3;
public static int SHUTDOWN_HOOK_TMP_FILES_CLEANUP_PRIORITY = 2;
Added: pig/trunk/test/org/apache/pig/test/TestDateTime.java
URL:
http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestDateTime.java?rev=1795844&view=auto
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestDateTime.java (added)
+++ pig/trunk/test/org/apache/pig/test/TestDateTime.java Mon May 22 19:46:08
2017
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import org.apache.commons.io.output.ByteArrayOutputStream;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.pig.PigServer;
+import org.apache.pig.backend.hadoop.DateTimeWritable;
+import org.apache.pig.builtin.ToDate;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.util.UDFContext;
+
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.junit.After;
+import org.junit.AfterClass;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.FixMethodOrder;
+import org.junit.Test;
+import org.junit.runners.MethodSorters;
+
+import static org.junit.Assert.assertEquals;
+
+@FixMethodOrder(MethodSorters.NAME_ASCENDING)
+public class TestDateTime {
+
+ private static MiniGenericCluster cluster;
+ private static PigServer pigServer;
+ private static PigServer pigServerLocal;
+ private static File tmpFile;
+ private static DateTimeZone currentDTZ;
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ cluster = MiniGenericCluster.buildCluster();
+ pigServer = new PigServer(cluster.getExecType(),
cluster.getProperties());
+ pigServerLocal = new PigServer(Util.getLocalTestMode(), new
Properties());
+ currentDTZ = DateTimeZone.getDefault();
+
+ tmpFile = File.createTempFile("test", "txt");
+ PrintStream ps = new PrintStream(new FileOutputStream(tmpFile));
+ ps.println("1970-01-01T00:00:00.000");
+ ps.println("1970-01-01T00:00:00.000Z");
+ ps.println("1970-01-03T00:00:00.000");
+ ps.println("1970-01-03T00:00:00.000Z");
+ ps.println("1970-01-05T00:00:00.000");
+ ps.println("1970-01-05T00:00:00.000Z");
+ // for testing DST
+ ps.println("2014-02-01T00:00:00.000"); // EST
+ ps.println("2014-06-01T00:00:00.000"); // EDT
+ ps.close();
+ }
+
+ @AfterClass
+ public static void oneTimeTearDown() throws Exception {
+ cluster.shutDown();
+ tmpFile.delete();
+ }
+
+ @After
+ public void restoreDefaultTZ() throws Exception {
+ DateTimeZone.setDefault(currentDTZ);
+ }
+
+ @Before
+ public void cleanUpTmpFiles() throws Exception {
+ FileLocalizer.deleteTempFiles();
+ }
+
+ @Test
+ public void testDateTimeWritables() throws IOException {
+ Configuration jobConf = new Configuration();
+
+ DateTimeWritable.setupAvailableZoneIds();
+ UDFContext.getUDFContext().addJobConf(jobConf);
+
+ String[] testZones = new String[]{"America/Los_Angeles",
"Europe/London", "Europe/Budapest", "Japan"};
+
+ DateTime testInputBase = DateTime.now();
+ for (String zoneId : testZones){
+ DateTimeZone customTZ = DateTimeZone.forID(zoneId);
+ //Test with timezone name as zone ID
+ testDateTimeForZone(new
DateTime(testInputBase).withZoneRetainFields(customTZ));
+ //Test with offset as zone ID
+ testDateTimeForZone(new DateTime(testInputBase,
DateTimeZone.forOffsetMillis(customTZ.getOffset(testInputBase))));
+ }
+
+ }
+
+ private void testDateTimeForZone(DateTime testIn) throws IOException {
+ DateTimeWritable in = new DateTimeWritable(testIn);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(outputStream);
+ in.write(dataOut);
+ dataOut.flush();
+
+ // read from byte[]
+ DateTimeWritable out = new DateTimeWritable();
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(
+ outputStream.toByteArray());
+ DataInputStream dataIn = new DataInputStream(inputStream);
+ out.readFields(dataIn);
+
+ assertEquals(in.get(), out.get());
+ }
+
+ @Test
+ public void testLocalExecution() throws Exception {
+ Iterator<Tuple> expectedItr = generateExpectedResults(DateTimeZone
+
.forOffsetMillis(DateTimeZone.forID("+08:00").getOffset(null)));
+
pigServerLocal.getPigContext().getProperties().setProperty("pig.datetime.default.tz",
"+08:00");
+ pigServerLocal.registerQuery("a = load '"
+ + Util.encodeEscape(Util.generateURI(tmpFile.toString(),
pigServerLocal.getPigContext()))
+ + "' as (test:datetime);");
+ pigServerLocal.registerQuery("b = filter a by test <
ToDate('1970-01-04T00:00:00.000');");
+ Iterator<Tuple> actualItr = pigServerLocal.openIterator("b");
+ while (expectedItr.hasNext() && actualItr.hasNext()) {
+ Tuple expectedTuple = expectedItr.next();
+ Tuple actualTuple = actualItr.next();
+ assertEquals(expectedTuple, actualTuple);
+ }
+ assertEquals(expectedItr.hasNext(), actualItr.hasNext());
+ }
+
+ /**
+ * Tests DateTimeWritables on cluster
+ * @throws Exception
+ * Below input will trigger DateTime instances with both long zone id
(UTC) and short offset (+01:00)
+ */
+ @Test
+ public void testDateTimeZoneOnCluster() throws Exception {
+ String inputFileName = "testDateTime-input.txt";
+ String[] inputData = new String[]{ "1\t1990-01-04T12:30:00.000+01:00",
+ "2\t1990-01-04T11:30:00.000Z",
+ "3\t2001-01-01T01:00:00.000",
+ "4\t2017-02-02T15:19:00.000+01:00"
+ };
+ Util.createInputFile(cluster, inputFileName, inputData);
+
+ String script = "A = LOAD '"+inputFileName+"' AS (foo:int,
sd:datetime);" +
+ "B = group A by sd;" +
+ "C = foreach B generate group, MAX(A.foo);";
+
+ Util.registerMultiLineQuery(pigServer, script);
+
+ Iterator<Tuple> it = pigServer.openIterator("C");
+
+ //Should return last 3 rows from input
+ String sysTZOffset =
DateTimeZone.forOffsetMillis(DateTime.now().getZone().getOffset(0L)).toString();
+ Util.checkQueryOutputsAfterSortRecursive(
+ it,
+ new String[]{
+ "(1990-01-04T11:30:00.000Z,2)",
+ "(2001-01-01T01:00:00.000"+sysTZOffset+",3)",
+ "(2017-02-02T15:19:00.000+01:00,4)"
+ },
+
org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+ }
+
+ @Test
+ public void testZoneDST() throws Exception {
+ String defaultDTZ = "America/New_York"; // a timezone that uses DST
+
pigServerLocal.getPigContext().getProperties().setProperty("pig.datetime.default.tz",
defaultDTZ);
+ pigServerLocal.registerQuery("a = load '"
+ + Util.encodeEscape(Util.generateURI(tmpFile.toString(),
pigServerLocal.getPigContext()))
+ + "' as (test:datetime);");
+ pigServerLocal.registerQuery("b = filter a by test >
ToDate('2014-01-01T00:00:00.000');");
+ pigServerLocal.registerQuery("c = foreach b generate ToString(test,
'Z') as tz;");
+ Iterator<Tuple> actualItr = pigServerLocal.openIterator("c");
+
+ Tuple est = actualItr.next();
+ assertEquals(Util.buildTuple("-0500"), est);
+ Tuple edt = actualItr.next();
+ assertEquals(Util.buildTuple("-0400"), edt);
+ }
+
+ private static Iterator<Tuple> generateExpectedResults(DateTimeZone dtz)
+ throws Exception {
+ List<Tuple> expectedResults = new ArrayList<Tuple>();
+ expectedResults.add(Util.buildTuple(new DateTime(
+ "1970-01-01T00:00:00.000", dtz)));
+ expectedResults.add(Util.buildTuple(new DateTime(
+ "1970-01-01T00:00:00.000", DateTimeZone.UTC)));
+ expectedResults.add(Util.buildTuple(new DateTime(
+ "1970-01-03T00:00:00.000", dtz)));
+ expectedResults.add(Util.buildTuple(new DateTime(
+ "1970-01-03T00:00:00.000", DateTimeZone.UTC)));
+ return expectedResults.iterator();
+ }
+
+ @Test
+ public void testTimeZone() throws IOException {
+ // Usually set through "pig.datetime.default.tz"
+ String defaultDTZ = "+03:00";
+ DateTimeZone.setDefault(DateTimeZone.forID(defaultDTZ));
+ String[] inputs = {
+ "1970-01-01T00:00:00.000-08:00",
+ "1970-01-01T00:00",
+ "1970-01-01T00",
+ "1970-01-01T",
+ "1970-01T",
+ "1970T",
+ "1970-01-01T00:00-08:00",
+ "1970-01-01T00-05:00",
+ "1970-01-01T-08:00",
+ "1970-01T-08:00",
+ //"1970T+8:00", //Invalid format
+ "1970-01-01",
+ "1970-01",
+ "1970",
+ };
+ String[] expectedDTZOutputs = {
+ "-08:00",
+ defaultDTZ,
+ defaultDTZ,
+ defaultDTZ,
+ defaultDTZ,
+ defaultDTZ,
+ "-08:00",
+ "-05:00",
+ "-08:00",
+ "-08:00",
+ defaultDTZ,
+ defaultDTZ,
+ defaultDTZ
+ };
+ String[] expectedDTOutputs = {
+ "1970-01-01T00:00:00.000-08:00",
+ "1970-01-01T00:00:00.000" + defaultDTZ,
+ "1970-01-01T00:00:00.000" + defaultDTZ,
+ "1970-01-01T00:00:00.000" + defaultDTZ,
+ "1970-01-01T00:00:00.000" + defaultDTZ,
+ "1970-01-01T00:00:00.000" + defaultDTZ,
+ "1970-01-01T00:00:00.000-08:00",
+ "1970-01-01T00:00:00.000-05:00",
+ "1970-01-01T00:00:00.000-08:00",
+ "1970-01-01T00:00:00.000-08:00",
+ "1970-01-01T00:00:00.000" + defaultDTZ,
+ "1970-01-01T00:00:00.000" + defaultDTZ,
+ "1970-01-01T00:00:00.000" + defaultDTZ
+ };
+
+ for( int i = 0; i < inputs.length; i++ ) {
+ DateTimeZone dtz = ToDate.extractDateTimeZone( inputs[i] );
+ assertEquals( expectedDTZOutputs[i], dtz.toString() );
+ DateTime dt = ToDate.extractDateTime( inputs[i] );
+ assertEquals( expectedDTOutputs[i], dt.toString() );
+ System.out.println( "\"" + dt + "\"," );
+ }
+
+ }
+
+}