This is an automated email from the ASF dual-hosted git repository.
jackie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
The following commit(s) were added to refs/heads/master by this push:
new ce092c8 Fix the bug where time conversion is skipped when incoming
and outgoing time column name are the same (#3484)
ce092c8 is described below
commit ce092c87ccd5f5251dc08b9123f80da1280db798
Author: Xiaotian (Jackie) Jiang <[email protected]>
AuthorDate: Mon Nov 19 18:16:12 2018 -0800
Fix the bug where time conversion is skipped when incoming and outgoing
time column name are the same (#3484)
1. Refactor TimeConverter to based on single time granularity spec
2. Refactor TimeTransformer so it can automatically detect whether the
conversion is needed (based on the value in the record)
Added tests for the new TimeConverter and TimeTransformer
---
.../common/utils/time/DefaultTimeConverter.java | 83 ----------
.../pinot/common/utils/time/TimeConverter.java | 54 +++++--
.../common/utils/time/TimeConverterProvider.java | 27 ----
.../utils/time/DefaultTimeConverterTest.java | 174 ---------------------
.../pinot/common/utils/time/TimeConverterTest.java | 97 ++++++++++++
.../data/recordtransformer/TimeTransformer.java | 55 +++++--
.../recordtransformer/RecordTransformerTest.java | 25 ---
.../recordtransformer/TimeTransformerTest.java | 146 +++++++++++++++++
8 files changed, 324 insertions(+), 337 deletions(-)
diff --git
a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverter.java
b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverter.java
deleted file mode 100644
index c50aebc..0000000
---
a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverter.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.common.utils.time;
-
-import com.linkedin.pinot.common.data.TimeGranularitySpec;
-import com.linkedin.pinot.common.data.TimeGranularitySpec.TimeFormat;
-
-
-public class DefaultTimeConverter implements TimeConverter {
-
- TimeGranularitySpec incoming;
- TimeGranularitySpec outgoing;
- private boolean conversionSupported;
- private boolean needConversion;
-
- public void init(TimeGranularitySpec incoming, TimeGranularitySpec outgoing)
{
- this.incoming = incoming;
- this.outgoing = outgoing;
- conversionSupported = false;
- needConversion = true;
- if (incoming.equals(outgoing)) {
- needConversion = false;
- }
- if (TimeFormat.EPOCH.toString().equals(incoming.getTimeFormat()) &&
TimeFormat.EPOCH.toString()
- .equals(outgoing.getTimeFormat())) {
- conversionSupported = true;
- }
- if (needConversion && !conversionSupported) {
- //TODO: Handle conversion between sdf <-> epoch
- throw new RuntimeException("Conversion from Simple Date Format to
epoch/simpleDateFormat is not supported");
- }
- }
-
- @Override
- public Object convert(Object incomingTimeValue) {
- if (incomingTimeValue == null) {
- return null;
- }
- long duration;
- if (incomingTimeValue instanceof Number) {
- duration = ((Number) incomingTimeValue).longValue();
- } else {
- duration = Long.parseLong(incomingTimeValue.toString());
- }
- if (conversionSupported) {
- long outgoingTime = outgoing.getTimeType().convert(duration *
incoming.getTimeUnitSize(), incoming.getTimeType());
- return convertToOutgoingDataType(outgoingTime /
outgoing.getTimeUnitSize());
- } else {
- //TODO: Handle conversion between sdf <-> epoch
- throw new RuntimeException("Conversion from Simple Date Format to
epoch/simpleDateFormat is not supported");
- }
- }
-
- private Object convertToOutgoingDataType(long outgoingTimeValue) {
- switch (outgoing.getDataType()) {
- case INT:
- return (int) outgoingTimeValue;
- case LONG:
- return outgoingTimeValue;
- case FLOAT:
- return (float) outgoingTimeValue;
- case DOUBLE:
- return (double) outgoingTimeValue;
- case STRING:
- return Long.toString(outgoingTimeValue);
- default:
- return outgoingTimeValue;
- }
- }
-}
diff --git
a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverter.java
b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverter.java
index ddf090f..dd09be8 100644
---
a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverter.java
+++
b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverter.java
@@ -15,24 +15,50 @@
*/
package com.linkedin.pinot.common.utils.time;
+import com.google.common.base.Preconditions;
import com.linkedin.pinot.common.data.TimeGranularitySpec;
+import java.util.concurrent.TimeUnit;
+
/**
- * TimeConverter to convert inputTimeValue whose spec is defined by
incomingGranularitySpec to
- * outgoingGranularitySpec
+ * TimeConverter to convert value to/from milliseconds since epoch based on
the given {@link TimeGranularitySpec}.
*/
-public interface TimeConverter {
- /**
- * @param incomingGranularitySpec
- * @param outgoingGranularitySpec
- */
- void init(TimeGranularitySpec incomingGranularitySpec,
- TimeGranularitySpec outgoingGranularitySpec);
+public class TimeConverter {
+ private final TimeGranularitySpec _timeGranularitySpec;
+
+ public TimeConverter(TimeGranularitySpec timeGranularitySpec) {
+ Preconditions.checkArgument(
+
timeGranularitySpec.getTimeFormat().equals(TimeGranularitySpec.TimeFormat.EPOCH.toString()),
+ "Cannot perform time conversion for time format other than EPOCH");
+ _timeGranularitySpec = timeGranularitySpec;
+ }
- /**
- * @param incoming time value based on incoming time spec
- * @return time value based on outgoing time spec
- */
- Object convert(Object inputTimeValue);
+ public long toMillisSinceEpoch(Object value) {
+ long duration;
+ if (value instanceof Number) {
+ duration = ((Number) value).longValue();
+ } else {
+ duration = Long.parseLong(value.toString());
+ }
+ return _timeGranularitySpec.getTimeType().toMillis(duration *
_timeGranularitySpec.getTimeUnitSize());
+ }
+ public Object fromMillisSinceEpoch(long value) {
+ long duration = _timeGranularitySpec.getTimeType().convert(value,
TimeUnit.MILLISECONDS)
+ / _timeGranularitySpec.getTimeUnitSize();
+ switch (_timeGranularitySpec.getDataType()) {
+ case INT:
+ return (int) duration;
+ case LONG:
+ return duration;
+ case FLOAT:
+ return (float) duration;
+ case DOUBLE:
+ return (double) duration;
+ case STRING:
+ return Long.toString(duration);
+ default:
+ throw new IllegalStateException();
+ }
+ }
}
diff --git
a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverterProvider.java
b/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverterProvider.java
deleted file mode 100644
index 2c5cfb8..0000000
---
a/pinot-common/src/main/java/com/linkedin/pinot/common/utils/time/TimeConverterProvider.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.common.utils.time;
-
-import com.linkedin.pinot.common.data.TimeGranularitySpec;
-
-public class TimeConverterProvider {
- public static TimeConverter getTimeConverter(TimeGranularitySpec
incomingGranularitySpec,
- TimeGranularitySpec outgoingGranularitySpec) {
- DefaultTimeConverter timeConverter = new DefaultTimeConverter();
- timeConverter.init(incomingGranularitySpec, outgoingGranularitySpec);
- return timeConverter;
- }
-}
diff --git
a/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverterTest.java
b/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverterTest.java
deleted file mode 100644
index 2492bfd..0000000
---
a/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/DefaultTimeConverterTest.java
+++ /dev/null
@@ -1,174 +0,0 @@
-/**
- * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package com.linkedin.pinot.common.utils.time;
-
-import static java.util.concurrent.TimeUnit.*;
-
-import org.testng.Assert;
-import org.testng.annotations.Test;
-
-import static com.linkedin.pinot.common.data.FieldSpec.DataType.*;
-import com.linkedin.pinot.common.data.TimeGranularitySpec;
-import com.linkedin.pinot.common.data.TimeGranularitySpec.TimeFormat;
-public class DefaultTimeConverterTest {
-
- @Test
- public void testWithSameTimeSpec() {
- TimeGranularitySpec spec = new TimeGranularitySpec(LONG, 1, DAYS, "1day");
- DefaultTimeConverter timeConverter = new DefaultTimeConverter();
- timeConverter.init(spec, spec);
- for (int i = 0; i < 1000; ++i) {
- Object convertedValue = timeConverter.convert(i);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).intValue(), i);
- }
- for (long i = 0; i < 1000; ++i) {
- Object convertedValue = timeConverter.convert(i);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).longValue(), i);
- }
- }
-
- @Test
- public void testWithDifferentTimeSpecButSameValue() {
- TimeGranularitySpec incoming = new TimeGranularitySpec(LONG, 2, DAYS,
"2days");
- TimeGranularitySpec outgoing = new TimeGranularitySpec(LONG, 48, HOURS,
"48hours");
- DefaultTimeConverter timeConverter = new DefaultTimeConverter();
- timeConverter.init(incoming, outgoing);
- for (int i = 0; i < 1000; ++i) {
- Object convertedValue = timeConverter.convert(i);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).intValue(), i);
- }
- for (long i = 0; i < 1000; ++i) {
- Object convertedValue = timeConverter.convert(i);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).longValue(), i);
- }
- }
-
- @Test
- public void testWithDifferentTimeSpecs() {
- TimeGranularitySpec incoming = new TimeGranularitySpec(LONG, 2, DAYS,
"2days");
- TimeGranularitySpec outgoing = new TimeGranularitySpec(LONG, 24, HOURS,
"24hours");
- DefaultTimeConverter timeConverter = new DefaultTimeConverter();
- timeConverter.init(incoming, outgoing);
- for (int i = 0; i < 1000; ++i) {
- Object convertedValue = timeConverter.convert(i);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).intValue(), i * 2);
- }
- for (long i = 0; i < 1000; ++i) {
- Object convertedValue = timeConverter.convert(i);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).longValue(), i * 2);
- }
- }
-
- @Test
- public void testWithDifferentIncomingValueTypes() {
- TimeGranularitySpec incoming = new TimeGranularitySpec(LONG, 2, DAYS,
"2days");
- TimeGranularitySpec outgoing = new TimeGranularitySpec(LONG, 24, HOURS,
"24hours");
- DefaultTimeConverter timeConverter = new DefaultTimeConverter();
- timeConverter.init(incoming, outgoing);
- Object convertedValue = timeConverter.convert("1");
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).intValue(), 2);
- convertedValue = timeConverter.convert(1);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).intValue(), 2);
- convertedValue = timeConverter.convert((long) 1);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).intValue(), 2);
- convertedValue = timeConverter.convert((short) 1);
- Assert.assertTrue(convertedValue instanceof Long, "Converted value data
type should be Long");
- Assert.assertEquals(((Long) convertedValue).intValue(), 2);
- }
-
- @Test
- public void testWithOutgoingValueTypesString() {
- TimeGranularitySpec incoming = new TimeGranularitySpec(LONG, 2, DAYS,
"2days");
- TimeGranularitySpec outgoing = new TimeGranularitySpec(STRING, 24, HOURS,
"24hours");
- DefaultTimeConverter timeConverter = new DefaultTimeConverter();
- timeConverter.init(incoming, outgoing);
- Object convertedValue = timeConverter.convert("1");
- Assert.assertTrue(convertedValue instanceof String, "Converted value data
type should be STRING");
- Assert.assertEquals(Integer.parseInt(convertedValue.toString()), 2);
- Assert.assertEquals(convertedValue, "2");
- convertedValue = timeConverter.convert(1);
- Assert.assertTrue(convertedValue instanceof String, "Converted value data
type should be STRING");
- Assert.assertEquals(Integer.parseInt(convertedValue.toString()), 2);
- Assert.assertEquals(convertedValue, "2");
- convertedValue = timeConverter.convert((long) 1);
- Assert.assertTrue(convertedValue instanceof String, "Converted value data
type should be STRING");
- Assert.assertEquals(Integer.parseInt(convertedValue.toString()), 2);
- Assert.assertEquals(convertedValue, "2");
- convertedValue = timeConverter.convert((short) 1);
- Assert.assertTrue(convertedValue instanceof String, "Converted value data
type should be STRING");
- Assert.assertEquals(Integer.parseInt(convertedValue.toString()), 2);
- Assert.assertEquals(convertedValue, "2");
- }
-
- @Test
- public void testSimpleDateFormat() {
- TimeGranularitySpec incoming;
- TimeGranularitySpec outgoing;
- DefaultTimeConverter timeConverter;
- String SDF_PREFIX = TimeFormat.SIMPLE_DATE_FORMAT.toString();
-
- //this should not throw exception, since incoming == outgoing
- try {
- incoming = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX +
":yyyyMMdd", "1hour");
- outgoing = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX +
":yyyyMMdd", "1hour");
- timeConverter = new DefaultTimeConverter();
- timeConverter.init(incoming, outgoing);
- } catch (Exception e) {
- Assert.fail("sdf to sdf must be supported as long as incoming sdf =
outgoing sdf");
- }
- //we don't support epoch to sdf conversion
- try {
- incoming = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX +
":yyyyMMdd", "1hour");
- outgoing = new TimeGranularitySpec(LONG, 1, HOURS, "1hour");
- timeConverter = new DefaultTimeConverter();
- timeConverter.init(incoming, outgoing);
- Assert.fail("We don't support converting epoch to sdf currently");
- } catch (Exception e) {
- //expected
- }
-
- //we don't support sdf to epoch conversion
- try {
- incoming = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX +
":yyyyMMdd", "1hour");
- outgoing = new TimeGranularitySpec(LONG, 1, HOURS, "1hour");
- timeConverter = new DefaultTimeConverter();
- timeConverter.init(incoming, outgoing);
- Assert.fail("We don't support converting sdf to epoch currently");
- } catch (Exception e) {
- //expected
- }
-
- //we don't support sdf to sdf conversion where incoming sdf != outoging sdf
- try {
- incoming = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX +
":yyyyMMdd", "1hour");
- outgoing = new TimeGranularitySpec(STRING, 1, HOURS, SDF_PREFIX +
":yyyyMMddHH", "1hour");
- timeConverter = new DefaultTimeConverter();
- timeConverter.init(incoming, outgoing);
- Assert.fail("We don't support converting sdf to sdf where incoming sdf
!= outgoing sdf");
- } catch (Exception e) {
- //expected
- }
- }
-}
diff --git
a/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/TimeConverterTest.java
b/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/TimeConverterTest.java
new file mode 100644
index 0000000..38087bb
--- /dev/null
+++
b/pinot-common/src/test/java/com/linkedin/pinot/common/utils/time/TimeConverterTest.java
@@ -0,0 +1,97 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.common.utils.time;
+
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.common.data.TimeGranularitySpec;
+import java.util.concurrent.TimeUnit;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeConverterTest {
+
+ @Test
+ public void testIntTimeColumn() {
+ TimeConverter converter =
+ new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.INT, 5,
TimeUnit.HOURS, "time"));
+ // Should support conversion from all data types
+ assertEquals(converter.toMillisSinceEpoch(123), 123 *
TimeUnit.HOURS.toMillis(5));
+ assertEquals(converter.toMillisSinceEpoch(123L), 123 *
TimeUnit.HOURS.toMillis(5));
+ assertEquals(converter.toMillisSinceEpoch(123f), 123 *
TimeUnit.HOURS.toMillis(5));
+ assertEquals(converter.toMillisSinceEpoch(123d), 123 *
TimeUnit.HOURS.toMillis(5));
+ assertEquals(converter.toMillisSinceEpoch("123"), 123 *
TimeUnit.HOURS.toMillis(5));
+
+ assertEquals(converter.fromMillisSinceEpoch(123 *
TimeUnit.HOURS.toMillis(5)), 123);
+ }
+
+ @Test
+ public void testLongTimeColumn() {
+ TimeConverter converter =
+ new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.LONG, 10,
TimeUnit.DAYS, "time"));
+ // Should support conversion from all data types
+ assertEquals(converter.toMillisSinceEpoch(123), 123 *
TimeUnit.DAYS.toMillis(10));
+ assertEquals(converter.toMillisSinceEpoch(123L), 123 *
TimeUnit.DAYS.toMillis(10));
+ assertEquals(converter.toMillisSinceEpoch(123f), 123 *
TimeUnit.DAYS.toMillis(10));
+ assertEquals(converter.toMillisSinceEpoch(123d), 123 *
TimeUnit.DAYS.toMillis(10));
+ assertEquals(converter.toMillisSinceEpoch("123"), 123 *
TimeUnit.DAYS.toMillis(10));
+
+ assertEquals(converter.fromMillisSinceEpoch(123 *
TimeUnit.DAYS.toMillis(10)), 123L);
+ }
+
+ @Test
+ public void testFloatTimeColumn() {
+ TimeConverter converter =
+ new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.FLOAT, 1,
TimeUnit.SECONDS, "time"));
+ // Should support conversion from all data types
+ assertEquals(converter.toMillisSinceEpoch(123), 123 *
TimeUnit.SECONDS.toMillis(1));
+ assertEquals(converter.toMillisSinceEpoch(123L), 123 *
TimeUnit.SECONDS.toMillis(1));
+ assertEquals(converter.toMillisSinceEpoch(123f), 123 *
TimeUnit.SECONDS.toMillis(1));
+ assertEquals(converter.toMillisSinceEpoch(123d), 123 *
TimeUnit.SECONDS.toMillis(1));
+ assertEquals(converter.toMillisSinceEpoch("123"), 123 *
TimeUnit.SECONDS.toMillis(1));
+
+ assertEquals(converter.fromMillisSinceEpoch(123 *
TimeUnit.SECONDS.toMillis(1)), 123f);
+ }
+
+ @Test
+ public void testDoubleTimeColumn() {
+ TimeConverter converter =
+ new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.DOUBLE,
3, TimeUnit.MINUTES, "time"));
+ // Should support conversion from all data types
+ assertEquals(converter.toMillisSinceEpoch(123), 123 *
TimeUnit.MINUTES.toMillis(3));
+ assertEquals(converter.toMillisSinceEpoch(123L), 123 *
TimeUnit.MINUTES.toMillis(3));
+ assertEquals(converter.toMillisSinceEpoch(123f), 123 *
TimeUnit.MINUTES.toMillis(3));
+ assertEquals(converter.toMillisSinceEpoch(123d), 123 *
TimeUnit.MINUTES.toMillis(3));
+ assertEquals(converter.toMillisSinceEpoch("123"), 123 *
TimeUnit.MINUTES.toMillis(3));
+
+ assertEquals(converter.fromMillisSinceEpoch(123 *
TimeUnit.MINUTES.toMillis(3)), 123d);
+ }
+
+ @Test
+ public void testStringTimeColumn() {
+ TimeConverter converter =
+ new TimeConverter(new TimeGranularitySpec(FieldSpec.DataType.STRING,
100, TimeUnit.MILLISECONDS, "time"));
+ // Should support conversion from all data types
+ assertEquals(converter.toMillisSinceEpoch(123), 123 *
TimeUnit.MILLISECONDS.toMillis(100));
+ assertEquals(converter.toMillisSinceEpoch(123L), 123 *
TimeUnit.MILLISECONDS.toMillis(100));
+ assertEquals(converter.toMillisSinceEpoch(123f), 123 *
TimeUnit.MILLISECONDS.toMillis(100));
+ assertEquals(converter.toMillisSinceEpoch(123d), 123 *
TimeUnit.MILLISECONDS.toMillis(100));
+ assertEquals(converter.toMillisSinceEpoch("123"), 123 *
TimeUnit.MILLISECONDS.toMillis(100));
+
+ assertEquals(converter.fromMillisSinceEpoch(123 *
TimeUnit.MILLISECONDS.toMillis(100)), "123");
+ }
+}
diff --git
a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
index 2ee8e00..a117982 100644
---
a/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
+++
b/pinot-core/src/main/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformer.java
@@ -19,7 +19,7 @@ import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.common.data.TimeFieldSpec;
import com.linkedin.pinot.common.data.TimeGranularitySpec;
import com.linkedin.pinot.common.utils.time.TimeConverter;
-import com.linkedin.pinot.common.utils.time.TimeConverterProvider;
+import com.linkedin.pinot.common.utils.time.TimeUtils;
import com.linkedin.pinot.core.data.GenericRow;
@@ -29,37 +29,64 @@ import com.linkedin.pinot.core.data.GenericRow;
* column for other record transformers (incoming time column can be ignored).
*/
public class TimeTransformer implements RecordTransformer {
- private final String _incomingTimeColumn;
- private final String _outgoingTimeColumn;
- private final TimeConverter _timeConverter;
+ private String _incomingTimeColumn;
+ private String _outgoingTimeColumn;
+ private TimeConverter _incomingTimeConverter;
+ private TimeConverter _outgoingTimeConverter;
+ private boolean _isValidated;
public TimeTransformer(Schema schema) {
TimeFieldSpec timeFieldSpec = schema.getTimeFieldSpec();
if (timeFieldSpec != null) {
TimeGranularitySpec incomingGranularitySpec =
timeFieldSpec.getIncomingGranularitySpec();
TimeGranularitySpec outgoingGranularitySpec =
timeFieldSpec.getOutgoingGranularitySpec();
+
+ // Perform time conversion only if incoming and outgoing granularity
spec are different
if (!incomingGranularitySpec.equals(outgoingGranularitySpec)) {
_incomingTimeColumn = incomingGranularitySpec.getName();
_outgoingTimeColumn = outgoingGranularitySpec.getName();
- _timeConverter =
TimeConverterProvider.getTimeConverter(incomingGranularitySpec,
outgoingGranularitySpec);
- return;
+ _incomingTimeConverter = new TimeConverter(incomingGranularitySpec);
+ _outgoingTimeConverter = new TimeConverter(outgoingGranularitySpec);
}
}
- _incomingTimeColumn = null;
- _outgoingTimeColumn = null;
- _timeConverter = null;
}
@Override
public GenericRow transform(GenericRow record) {
- if (_timeConverter == null) {
+ if (_incomingTimeColumn == null) {
return record;
}
- // Skip transformation if outgoing value already exist
- // NOTE: outgoing value might already exist for OFFLINE data
- if (record.getValue(_outgoingTimeColumn) == null) {
- record.putField(_outgoingTimeColumn,
_timeConverter.convert(record.getValue(_incomingTimeColumn)));
+
+ Object incomingTimeValue = record.getValue(_incomingTimeColumn);
+ // Validate the time values and determine whether the conversion is needed
+ if (!_isValidated) {
+ // If incoming time value does not exist or the value is invalid after
conversion, check the outgoing time value.
+ // If the outgoing time value is valid, skip time conversion, otherwise,
throw exception.
+ if (incomingTimeValue == null || !TimeUtils.timeValueInValidRange(
+ _incomingTimeConverter.toMillisSinceEpoch(incomingTimeValue))) {
+ Object outgoingTimeValue = record.getValue(_outgoingTimeColumn);
+ if (outgoingTimeValue == null || !TimeUtils.timeValueInValidRange(
+ _outgoingTimeConverter.toMillisSinceEpoch(outgoingTimeValue))) {
+ throw new IllegalStateException(
+ "No valid time value found in either incoming time column: " +
_incomingTimeColumn
+ + " or outgoing time column: " + _outgoingTimeColumn);
+ } else {
+ disableConversion();
+ return record;
+ }
+ }
+ _isValidated = true;
}
+
+ record.putField(_outgoingTimeColumn,
+
_outgoingTimeConverter.fromMillisSinceEpoch(_incomingTimeConverter.toMillisSinceEpoch(incomingTimeValue)));
return record;
}
+
+ private void disableConversion() {
+ _incomingTimeColumn = null;
+ _outgoingTimeColumn = null;
+ _incomingTimeConverter = null;
+ _outgoingTimeConverter = null;
+ }
}
diff --git
a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
index fb5f0fc..6d2940b 100644
---
a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
+++
b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/RecordTransformerTest.java
@@ -20,7 +20,6 @@ import com.linkedin.pinot.common.data.Schema;
import com.linkedin.pinot.core.data.GenericRow;
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import org.testng.annotations.Test;
import static org.testng.Assert.*;
@@ -41,9 +40,6 @@ public class RecordTransformerTest {
// For sanitation
.addSingleValueDimension("svStringWithNullCharacters",
FieldSpec.DataType.STRING)
.addSingleValueDimension("svStringWithLengthLimit",
FieldSpec.DataType.STRING)
- // For time conversion
- .addTime("incoming", 6, TimeUnit.HOURS, FieldSpec.DataType.INT,
"outgoing", 1, TimeUnit.MILLISECONDS,
- FieldSpec.DataType.LONG)
.build();
static {
@@ -67,25 +63,11 @@ public class RecordTransformerTest {
fields.put("mvDouble", new Object[]{123});
fields.put("svStringWithNullCharacters", "1\0002\0003");
fields.put("svStringWithLengthLimit", "123");
- fields.put("incoming", "123");
record.init(fields);
return record;
}
@Test
- public void testTimeTransformer() {
- RecordTransformer transformer = new TimeTransformer(SCHEMA);
- GenericRow record = getRecord();
- for (int i = 0; i < NUM_ROUNDS; i++) {
- record = transformer.transform(record);
- assertNotNull(record);
- // We keep the incoming time field in case other transformers rely on it
- assertEquals(record.getValue("incoming"), "123");
- assertEquals(record.getValue("outgoing"), 123 * 6 * 3600 * 1000L);
- }
- }
-
- @Test
public void testDataTypeTransformer() {
RecordTransformer transformer = new DataTypeTransformer(SCHEMA);
GenericRow record = getRecord();
@@ -103,11 +85,6 @@ public class RecordTransformerTest {
assertEquals(record.getValue("mvDouble"), new Object[]{123d});
assertEquals(record.getValue("svStringWithNullCharacters"),
"1\0002\0003");
assertEquals(record.getValue("svStringWithLengthLimit"), "123");
- // Incoming time field won't be converted (it's ignored in this
transformer)
- assertEquals(record.getValue("incoming"), "123");
- // Outgoing time field will be converted (without time transformer, this
field will be null before transform, and
- // be filled with default null value after transform)
- assertEquals(record.getValue("outgoing"), Long.MIN_VALUE);
}
}
@@ -141,8 +118,6 @@ public class RecordTransformerTest {
assertEquals(record.getValue("mvDouble"), new Object[]{123d});
assertEquals(record.getValue("svStringWithNullCharacters"), "1");
assertEquals(record.getValue("svStringWithLengthLimit"), "12");
- assertEquals(record.getValue("incoming"), "123");
- assertEquals(record.getValue("outgoing"), 123 * 6 * 3600 * 1000L);
}
}
diff --git
a/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformerTest.java
b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformerTest.java
new file mode 100644
index 0000000..5b0e5a3
--- /dev/null
+++
b/pinot-core/src/test/java/com/linkedin/pinot/core/data/recordtransformer/TimeTransformerTest.java
@@ -0,0 +1,146 @@
+/**
+ * Copyright (C) 2014-2018 LinkedIn Corp. ([email protected])
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.linkedin.pinot.core.data.recordtransformer;
+
+import com.linkedin.pinot.common.data.FieldSpec;
+import com.linkedin.pinot.common.data.Schema;
+import com.linkedin.pinot.common.data.TimeFieldSpec;
+import com.linkedin.pinot.common.data.TimeGranularitySpec;
+import com.linkedin.pinot.core.data.GenericRow;
+import java.util.concurrent.TimeUnit;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+import org.testng.annotations.Test;
+
+import static org.testng.Assert.*;
+
+
+public class TimeTransformerTest {
+ private static final long VALID_TIME = new DateTime(2018, 1, 1, 0, 0,
DateTimeZone.UTC).getMillis();
+ private static final long INVALID_TIME = new DateTime(2200, 1, 1, 0, 0,
DateTimeZone.UTC).getMillis();
+
+ @Test
+ public void testTimeFormat() {
+ // When incoming and outgoing spec are the same, any time format should
work
+ Schema schema = new Schema();
+ schema.addField(new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
+ TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(),
"time")));
+ TimeTransformer transformer = new TimeTransformer(schema);
+ GenericRow record = new GenericRow();
+ record.putField("time", 20180101);
+ record = transformer.transform(record);
+ assertNotNull(record);
+ assertEquals(record.getValue("time"), 20180101);
+
+ // When incoming and outgoing spec are not the same, simple date format is
not allowed
+ schema = new Schema();
+ schema.addField(new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS,
+ TimeGranularitySpec.TimeFormat.SIMPLE_DATE_FORMAT.toString(),
"incoming"),
+ new TimeGranularitySpec(FieldSpec.DataType.LONG, TimeUnit.SECONDS,
"outgoing")));
+ try {
+ new TimeTransformer(schema);
+ fail();
+ } catch (IllegalArgumentException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testSkipConversion() {
+ // When incoming time does not exist or is invalid, outgoing time exists
and is valid, skip conversion
+ Schema schema = new Schema();
+ schema.addField(new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "incoming"),
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "outgoing")));
+ TimeTransformer transformer = new TimeTransformer(schema);
+ GenericRow record = new GenericRow();
+ record.putField("outgoing", VALID_TIME);
+ record = transformer.transform(record);
+ assertNotNull(record);
+ assertEquals(record.getValue("outgoing"), VALID_TIME);
+
+ transformer = new TimeTransformer(schema);
+ record = new GenericRow();
+ record.putField("incoming", INVALID_TIME);
+ record.putField("outgoing", VALID_TIME);
+ record = transformer.transform(record);
+ assertNotNull(record);
+ assertEquals(record.getValue("outgoing"), VALID_TIME);
+
+ // When incoming and outgoing time column is the same, and the value is
already converted, skip conversion
+ schema = new Schema();
+ schema.addField(new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "time"),
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "time")));
+ transformer = new TimeTransformer(schema);
+ record = new GenericRow();
+ record.putField("time", VALID_TIME);
+ record = transformer.transform(record);
+ assertNotNull(record);
+ assertEquals(record.getValue("time"), VALID_TIME);
+
+ // When both incoming and outgoing time do not exist or are invalid, throw
exception
+ schema = new Schema();
+ schema.addField(new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "incoming"),
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "outgoing")));
+ transformer = new TimeTransformer(schema);
+ record = new GenericRow();
+ record.putField("incoming", INVALID_TIME);
+ record.putField("outgoing", INVALID_TIME);
+ try {
+ transformer.transform(record);
+ fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+
+ schema = new Schema();
+ schema.addField(new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "time"),
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "time")));
+ transformer = new TimeTransformer(schema);
+ record = new GenericRow();
+ record.putField("time", INVALID_TIME);
+ try {
+ transformer.transform(record);
+ fail();
+ } catch (IllegalStateException e) {
+ // Expected
+ }
+ }
+
+ @Test
+ public void testTimeConversion() {
+ // When incoming time exists and is valid, do the conversion
+ Schema schema = new Schema();
+ schema.addField(new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "incoming"),
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "outgoing")));
+ TimeTransformer transformer = new TimeTransformer(schema);
+ GenericRow record = new GenericRow();
+ record.putField("incoming", TimeUnit.MILLISECONDS.toDays(VALID_TIME));
+ record = transformer.transform(record);
+ assertNotNull(record);
+ assertEquals(record.getValue("outgoing"), VALID_TIME);
+
+ // When incoming and outgoing time column is the same, and the value is
not yet converted, do the conversion
+ schema = new Schema();
+ schema.addField(new TimeFieldSpec(new
TimeGranularitySpec(FieldSpec.DataType.INT, TimeUnit.DAYS, "time"),
+ new TimeGranularitySpec(FieldSpec.DataType.LONG,
TimeUnit.MILLISECONDS, "time")));
+ transformer = new TimeTransformer(schema);
+ record = new GenericRow();
+ record.putField("time", TimeUnit.MILLISECONDS.toDays(VALID_TIME));
+ record = transformer.transform(record);
+ assertNotNull(record);
+ assertEquals(record.getValue("time"), VALID_TIME);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]