snleee commented on a change in pull request #4737: Druid-Pinot Segment Converter Tool URL: https://github.com/apache/incubator-pinot/pull/4737#discussion_r346579861
########## File path: contrib/pinot-druid-migration/src/main/java/org/apache/pinot/druid/data/readers/DruidSegmentRecordReader.java ########## @@ -0,0 +1,252 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pinot.druid.data.readers; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.base.Function; +import com.google.common.base.Preconditions; +import java.io.File; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import java.util.stream.Collectors; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import org.apache.druid.jackson.DefaultObjectMapper; +import org.apache.druid.java.util.common.granularity.Granularities; +import org.apache.druid.java.util.common.guava.Sequence; +import org.apache.druid.java.util.common.guava.Sequences; +import org.apache.druid.segment.BaseObjectColumnValueSelector; +import org.apache.druid.segment.ColumnSelectorFactory; +import org.apache.druid.segment.Cursor; +import org.apache.druid.segment.IndexIO; +import org.apache.druid.segment.QueryableIndex; +import org.apache.druid.segment.QueryableIndexStorageAdapter; +import org.apache.druid.segment.VirtualColumns; +import org.apache.druid.segment.column.ColumnCapabilities; +import org.apache.druid.segment.column.ColumnConfig; +import org.apache.druid.segment.column.ValueType; +import org.apache.druid.segment.filter.Filters; +import org.apache.druid.query.DruidProcessingConfig; +import org.apache.pinot.common.data.FieldSpec; +import org.apache.pinot.common.data.Schema; +import org.apache.pinot.core.data.GenericRow; +import org.apache.pinot.core.data.readers.RecordReader; +import org.apache.pinot.core.indexsegment.generator.SegmentGeneratorConfig; +import org.apache.pinot.core.segment.creator.impl.V1Constants; +import org.joda.time.chrono.ISOChronology; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +/** + * The DruidSegmentRecordReader allows us to convert all of the rows in a Druid segment file + * into GenericRows, which are made into Pinot segments. + * + * Note that Druid uses LONG and not INT, so construct the Pinot schema accordingly. + */ +public class DruidSegmentRecordReader implements RecordReader { + private static final Logger LOGGER = LoggerFactory.getLogger(DruidSegmentRecordReader.class); + + private Schema _pinotSchema; + private ArrayList<String> _columnNames; + private Cursor _cursor; + private List<BaseObjectColumnValueSelector> _selectors; + private QueryableIndex _index; + + public DruidSegmentRecordReader(@Nonnull File indexDir, @Nullable Schema schema) + throws IOException { + init(indexDir, schema); + } + + private void init(File indexDir, Schema schema) + throws IOException { + // Only the columns whose names are in the Pinot schema will get processed + _pinotSchema = schema; + + ColumnConfig config = new DruidProcessingConfig() { + @Override + public String getFormatString() { + return "processing-%s"; + } + + @Override + public int intermediateComputeSizeBytes() { + return 100 * 1024 * 1024; + } + + @Override + public int getNumThreads() { + return 1; + } + + @Override + public int columnCacheSizeBytes() { + return 25 * 1024 * 1024; + } + }; + + ObjectMapper mapper = new DefaultObjectMapper(); + final IndexIO indexIO = new IndexIO(mapper, config); + _index = indexIO.loadIndex(indexDir); + QueryableIndexStorageAdapter adapter = new QueryableIndexStorageAdapter(_index); + + // A Sequence "represents an iterable sequence of elements. Unlike normal Iterators however, it doesn't expose + // a way for you to extract values from it, instead you provide it with a worker (an Accumulator) and that defines + // what happens with the data." + final Sequence<Cursor> cursors = adapter.makeCursors( + Filters.toFilter(null), + _index.getDataInterval().withChronology(ISOChronology.getInstanceUTC()), + VirtualColumns.EMPTY, + Granularities.ALL, + false, + null + ); + + List<Cursor> cursorList = new ArrayList<>(); + final Sequence<Object> sequence = Sequences.map(cursors, new Function<Cursor, Object>() { + @Override + public Object apply(@Nullable Cursor cursor) { + cursorList.add(cursor); + return null; + } + }); + sequence.accumulate(null, (accumulated, in) -> null); + + // There should only be one single Cursor for every segment, so there should only be one Cursor in the cursorList + Preconditions.checkArgument(cursorList.size() == 1, "There should only be one Cursor in the Sequence."); + _cursor = cursorList.get(0); + + _columnNames = new ArrayList<>(); + _columnNames.addAll(_pinotSchema.getColumnNames()); + validateColumns(adapter); + + ColumnSelectorFactory columnSelectorFactory = _cursor.getColumnSelectorFactory(); + _selectors = _columnNames + .stream() + .map(columnSelectorFactory::makeColumnValueSelector) + .collect(Collectors.toList()); + } + + //@Override + public void init(SegmentGeneratorConfig segmentGeneratorConfig) { + + } + + @Override + public boolean hasNext() { + return !_cursor.isDone(); + } + + @Override + public GenericRow next() throws IOException { + return next(new GenericRow()); + } + + @Override + public GenericRow next(GenericRow reuse) { + // Only the columns whose names are in the Pinot schema will get processed + for (int i = 0; i < _columnNames.size(); i++) { + final String columnName = _columnNames.get(i); + BaseObjectColumnValueSelector selector = _selectors.get(i); + // If the column does not exist in the segment file, skip it. + if (selector != null) { + FieldSpec fieldSpec = _pinotSchema.getFieldSpecFor(_columnNames.get(i)); + Object value = selector.getObject(); + if (value != null && !fieldSpec.isSingleValueField()) { + // Multi-valued dimensions in Druid are stored as Arrays.ArrayList (this has been checked) + Preconditions.checkArgument(value.getClass() == java.util.Arrays.asList().getClass(), "The multi-valued dimension " + columnName + + " should be java.util.Arrays$ArrayList, but it is " + value.getClass()); + // Store the multi-valued dimension as a String[] to follow Pinot format; null if empty + value = ((List<String>) value).toArray(new String[0]); + } + reuse.putField(columnName, value); + } + } + _cursor.advance(); + return reuse; + } + + @Override + public void rewind() { Review comment: Can we add the test to see if `rewind()` works correctly? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] With regards, Apache Git Services --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
