krishan1390 commented on code in PR #17304:
URL: https://github.com/apache/pinot/pull/17304#discussion_r2642224186
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java:
##########
@@ -58,56 +49,6 @@ public DefaultValueColumnReader(String columnName, int
numDocs, FieldSpec fieldS
_numDocs = numDocs;
_currentIndex = 0;
_dataType = fieldSpec.getDataType();
-
- // For multi-value fields, wrap the default value in an array
Review Comment:
removed this for now. I will probably create a seperate default reader to
preserve backward compatibility. don't see any other way.
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/segment/readers/DefaultValueColumnReader.java:
##########
Review Comment:
removed this for now. I will probably create a seperate default reader to
preserve backward compatibility. don't see any other way
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataTypeColumnTransformer.class);
+
+ private final PinotDataType _destDataType;
+ private final ColumnReader _columnReader;
+ private final boolean _continueOnError;
+ private final ThrottledLogger _throttledLogger;
+
+ /**
+ * @param fieldSpec - The field spec for the column being created in Pinot.
+ * @param columnReader - The column reader to read the source data.
+ */
+ public DataTypeColumnTransformer(TableConfig tableConfig, FieldSpec
fieldSpec, ColumnReader columnReader) {
+ _destDataType = PinotDataType.getPinotDataTypeForIngestion(fieldSpec);
+ _columnReader = columnReader;
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ _continueOnError = ingestionConfig != null &&
ingestionConfig.isContinueOnError();
+ _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+ }
+
+ @Override
+ public boolean isNoOp() {
+ // If source and destination data types are primitive types and the same,
no transformation is needed.
+ if (_columnReader.isSingleValue()) {
+ if (_columnReader.isInt()) {
+ return _destDataType.equals(PinotDataType.INTEGER);
+ } else if (_columnReader.isLong()) {
+ return _destDataType.equals(PinotDataType.LONG);
+ } else if (_columnReader.isFloat()) {
+ return _destDataType.equals(PinotDataType.FLOAT);
+ } else if (_columnReader.isDouble()) {
+ return _destDataType.equals(PinotDataType.DOUBLE);
+ } else if (_columnReader.isString()) {
+ return _destDataType.equals(PinotDataType.STRING);
+ }
+ } else {
+ if (_columnReader.isInt()) {
+ return _destDataType.equals(PinotDataType.INTEGER_ARRAY);
+ } else if (_columnReader.isLong()) {
+ return _destDataType.equals(PinotDataType.LONG_ARRAY);
+ } else if (_columnReader.isFloat()) {
+ return _destDataType.equals(PinotDataType.FLOAT_ARRAY);
+ } else if (_columnReader.isDouble()) {
+ return _destDataType.equals(PinotDataType.DOUBLE_ARRAY);
+ } else if (_columnReader.isString()) {
+ return _destDataType.equals(PinotDataType.STRING_ARRAY);
+ }
+ }
+ // For other types, because there is no overhead to cast to Object, always
call transform() which handles all cases
+ return false;
+ }
+
+ @Override
+ public Object transform(Object value) {
+ String columnName = _columnReader.getColumnName();
+ try {
+ return DataTypeTransformerUtils.transformValue(columnName, value,
_destDataType);
+ } catch (Exception e) {
+ if (!_continueOnError) {
+ throw new RuntimeException("Caught exception while transforming data
type for column: " + columnName, e);
+ }
+ _throttledLogger.warn("Caught exception while transforming data type for
column: " + columnName, e);
Review Comment:
done
##########
pinot-segment-local/src/main/java/org/apache/pinot/segment/local/columntransformer/DataTypeColumnTransformer.java:
##########
@@ -0,0 +1,99 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.segment.local.columntransformer;
+
+import org.apache.pinot.common.utils.PinotDataType;
+import org.apache.pinot.common.utils.ThrottledLogger;
+import org.apache.pinot.segment.local.utils.DataTypeTransformerUtils;
+import org.apache.pinot.spi.columntransformer.ColumnTransformer;
+import org.apache.pinot.spi.config.table.TableConfig;
+import org.apache.pinot.spi.config.table.ingestion.IngestionConfig;
+import org.apache.pinot.spi.data.FieldSpec;
+import org.apache.pinot.spi.data.readers.ColumnReader;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class DataTypeColumnTransformer implements ColumnTransformer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(DataTypeColumnTransformer.class);
+
+ private final PinotDataType _destDataType;
+ private final ColumnReader _columnReader;
+ private final boolean _continueOnError;
+ private final ThrottledLogger _throttledLogger;
+
+ /**
+ * @param fieldSpec - The field spec for the column being created in Pinot.
+ * @param columnReader - The column reader to read the source data.
+ */
+ public DataTypeColumnTransformer(TableConfig tableConfig, FieldSpec
fieldSpec, ColumnReader columnReader) {
+ _destDataType = PinotDataType.getPinotDataTypeForIngestion(fieldSpec);
+ _columnReader = columnReader;
+ IngestionConfig ingestionConfig = tableConfig.getIngestionConfig();
+ _continueOnError = ingestionConfig != null &&
ingestionConfig.isContinueOnError();
+ _throttledLogger = new ThrottledLogger(LOGGER, ingestionConfig);
+ }
+
+ @Override
+ public boolean isNoOp() {
+ // If source and destination data types are primitive types and the same,
no transformation is needed.
+ if (_columnReader.isSingleValue()) {
+ if (_columnReader.isInt()) {
+ return _destDataType.equals(PinotDataType.INTEGER);
+ } else if (_columnReader.isLong()) {
+ return _destDataType.equals(PinotDataType.LONG);
+ } else if (_columnReader.isFloat()) {
+ return _destDataType.equals(PinotDataType.FLOAT);
+ } else if (_columnReader.isDouble()) {
+ return _destDataType.equals(PinotDataType.DOUBLE);
+ } else if (_columnReader.isString()) {
+ return _destDataType.equals(PinotDataType.STRING);
+ }
+ } else {
+ if (_columnReader.isInt()) {
+ return _destDataType.equals(PinotDataType.INTEGER_ARRAY);
+ } else if (_columnReader.isLong()) {
+ return _destDataType.equals(PinotDataType.LONG_ARRAY);
+ } else if (_columnReader.isFloat()) {
+ return _destDataType.equals(PinotDataType.FLOAT_ARRAY);
+ } else if (_columnReader.isDouble()) {
+ return _destDataType.equals(PinotDataType.DOUBLE_ARRAY);
+ } else if (_columnReader.isString()) {
+ return _destDataType.equals(PinotDataType.STRING_ARRAY);
+ }
+ }
+ // For other types, because there is no overhead to cast to Object, always
call transform() which handles all cases
+ return false;
+ }
+
+ @Override
+ public Object transform(Object value) {
+ String columnName = _columnReader.getColumnName();
+ try {
+ return DataTypeTransformerUtils.transformValue(columnName, value,
_destDataType);
+ } catch (Exception e) {
+ if (!_continueOnError) {
+ throw new RuntimeException("Caught exception while transforming data
type for column: " + columnName, e);
+ }
+ _throttledLogger.warn("Caught exception while transforming data type for
column: " + columnName, e);
+ return null;
Review Comment:
yes its similar to row level transform where we update the value to null.
and then null transformer kicks in and applies default value.
record.markIncomplete() is used only for stats. even in complex transforms
its used for stats. I agree stats is useful but its going to be complicated to
add it here. also the stats now will be per column rather than per table so we
can't add metrics and can only log. So from a logging perspective we already
have the logs. The throttled logger will log the count of logs dropped too.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]