bvaradar commented on a change in pull request #1678: URL: https://github.com/apache/hudi/pull/1678#discussion_r435985550
########## File path: hudi-client/src/main/java/org/apache/hudi/client/bootstrap/selector/BootstrapRegexModeSelector.java ########## @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.bootstrap.selector; + +import java.util.List; +import java.util.Map; +import java.util.regex.Pattern; +import java.util.stream.Collectors; +import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.client.bootstrap.BootstrapMode; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; + +public class BootstrapRegexModeSelector extends BootstrapModeSelector { Review comment: For Mixed mode bootstrapping, this class can be used to select few partitions for full bootstrap while configuring metadata bootstrap for others. ########## File path: .travis.yml ########## @@ -13,14 +13,19 @@ # See the License for the specific language governing permissions and # limitations under the License. +os: linux language: java dist: trusty jdk: - openjdk8 -sudo: required -env: - - HUDI_QUIETER_LOGGING=1 TEST_SUITE=unit - - TEST_SUITE=integration +jobs: Review comment: Seeing timeout to CI runs. Went into a hole trying to reuse Spark Context. Ended up splitting CI run for hudi-client and others. This is from #1619 ########## File path: hudi-client/src/main/java/org/apache/hudi/client/bootstrap/BootstrapWriteStatus.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.bootstrap; + +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.common.model.BootstrapSourceFileMapping; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.util.collection.Pair; + +/** + * WriteStatus for Bootstrap. + */ +public class BootstrapWriteStatus extends WriteStatus { Review comment: Used for metadata bootstrap. Contains information needed to create bootstrap index. ########## File path: hudi-client/src/main/java/org/apache/hudi/client/HoodieWriteClient.java ########## @@ -146,6 +146,35 @@ public static SparkConf registerClasses(SparkConf conf) { return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown()); } + /** + * Main API to run bootstrap to hudi. + */ + public void bootstrap(Option<Map<String, String>> extraMetadata) { + if (rollbackPending) { + rollBackPendingBootstrap(); + } + HoodieTable<T> table = getTableAndInitCtx(WriteOperationType.UPSERT); + table.bootstrap(jsc, extraMetadata); + } + + /** + * Main API to rollback pending bootstrap. + */ + protected void rollBackPendingBootstrap() { Review comment: internal method made visible for testing. Bootstrap rollback is special as it would have to revert 2 commits (when both metadata and full bootstrap is configured) ########## File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.client.bootstrap.BootstrapMode; +import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator; +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Bootstrap specific configs. + */ +public class HoodieBootstrapConfig extends DefaultHoodieConfig { + + public static final String SOURCE_BASE_PATH_PROP = "hoodie.bootstrap.source.base.path"; Review comment: External Source Path ########## File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenerator.java ########## @@ -0,0 +1,93 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; Review comment: Moved base class to hudi-client as this is needed for metadata bootstrap ########## File path: hudi-client/src/main/java/org/apache/hudi/client/bootstrap/translator/MetadataBootstrapPartitionPathTranslator.java ########## @@ -16,28 +16,23 @@ * limitations under the License. */ -package org.apache.hudi.keygen; - -import org.apache.hudi.common.config.TypedProperties; -import org.apache.hudi.common.model.HoodieKey; - -import org.apache.avro.generic.GenericRecord; +package org.apache.hudi.client.bootstrap.translator; import java.io.Serializable; +import org.apache.hudi.common.config.TypedProperties; -/** - * Abstract class to extend for plugging in extraction of {@link HoodieKey} from an Avro record. - */ -public abstract class KeyGenerator implements Serializable { +public abstract class MetadataBootstrapPartitionPathTranslator implements Serializable { Review comment: HUDI-1001 to add implementations ########## File path: hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java ########## @@ -52,11 +53,18 @@ private static final String COW_TABLE_NAME = "stock_ticks_cow"; private static final String MOR_TABLE_NAME = "stock_ticks_mor"; + private static final String BOOTSTRAPPED_SRC_PATH = "/user/hive/warehouse/stock_ticks_cow_bs_src"; Review comment: what we do here : Once the first of ingestion is completed for COW and MOR table, we create a new parquet dataset by reading from COW. We then use this new parquet dataset to perform metadata bootstrap onto a new COW and MOR table. Hive and Spark SQL queries are run against these tables. We also perform upserts and compaction (MOR) on these metadata bootstrap tables and test through hive and spark-sql. Presto needs to be added after we support it. ########## File path: packaging/hudi-hadoop-mr-bundle/pom.xml ########## @@ -72,9 +72,19 @@ <include>org.objenesis:objenesis</include> <include>com.esotericsoftware:minlog</include> <include>org.apache.avro:avro</include> + <include>org.apache.hbase:hbase-common</include> Review comment: we need to add all these to bundles in order include HFile at runtime across query engines (fyi : @prashantwason ) ########## File path: hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java ########## @@ -122,20 +145,37 @@ public void shutdownGracefully() { * @throws Exception */ public void sync() throws Exception { - if (cfg.continuousMode) { - deltaSyncService.start(this::onDeltaSyncShutdown); - deltaSyncService.waitForShutdown(); - LOG.info("Delta Sync shutting down"); + if (bootstrapExecutor.isPresent()) { Review comment: Users are expected to run bootstrap executor in a stand-alone mode. Then they can switch to continuous mode for subsequent ingestion. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/HoodieTimeline.java ########## @@ -81,6 +81,13 @@ String INVALID_INSTANT_TS = "0"; + // Instant corresponding to pristine state of the table after its creation Review comment: I am using a value higher than INIT_INSTANT_TS for bootstrap instants so that we will have uniform semantics for incrementally reading bootstrap vs non-bootstrap commits. ########## File path: hudi-cli/src/test/java/org/apache/hudi/cli/commands/TestRepairsCommand.java ########## @@ -164,7 +164,7 @@ public void testOverwriteHoodieProperties() throws IOException { // check result List<String> allPropsStr = Arrays.asList("hoodie.table.name", "hoodie.table.type", - "hoodie.archivelog.folder", "hoodie.timeline.layout.version"); + "hoodie.archivelog.folder", "hoodie.bootstrap.index.class", "hoodie.timeline.layout.version"); Review comment: Bootstrap Index implementation is pluggable per request from AWS. ########## File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.client.bootstrap.BootstrapMode; +import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator; +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Bootstrap specific configs. + */ +public class HoodieBootstrapConfig extends DefaultHoodieConfig { + + public static final String SOURCE_BASE_PATH_PROP = "hoodie.bootstrap.source.base.path"; + public static final String BOOTSTRAP_MODE_SELECTOR = "hoodie.bootstrap.mode.selector"; + public static final String FULL_BOOTRAP_INPUT_PROVIDER = "hoodie.bootstrap.full.input.provider"; + public static final String BOOTSTRAP_KEYGEN_CLASS = "hoodie.bootstrap.keygen.class"; Review comment: Note: Same keygen class needs to be configured as the one used for regular upserts to have a consistent upsert behavior after bootstrap. ########## File path: hudi-client/src/main/java/org/apache/hudi/client/utils/MergingParquetIterator.java ########## @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.client.utils; + +import java.util.Iterator; +import java.util.function.Function; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.common.util.collection.Pair; + +public class MergingParquetIterator<T extends GenericRecord> implements Iterator<T> { Review comment: Used to provide stitched records from older file slice for upsert/compact on metadata bootstrapped table. ########## File path: hudi-client/src/main/java/org/apache/hudi/table/action/commit/BaseCommitActionExecutor.java ########## @@ -173,23 +173,26 @@ protected void updateIndexAndCommitIfNeeded(JavaRDD<WriteStatus> writeStatusRDD, protected void commitOnAutoCommit(HoodieWriteMetadata result) { if (config.shouldAutoCommit()) { LOG.info("Auto commit enabled: Committing " + instantTime); - commit(Option.empty(), result); + commit(extraMetadata, result); } else { LOG.info("Auto commit disabled for " + instantTime); } } - private void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) { + protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) { Review comment: Metadata bootstrap needs Write Statuses for creating bootstrap index. But, we also need to ensure collect on the Write Status RDD is done only once. Hence, this change. ########## File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.client.bootstrap.BootstrapMode; +import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator; +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Bootstrap specific configs. + */ +public class HoodieBootstrapConfig extends DefaultHoodieConfig { + + public static final String SOURCE_BASE_PATH_PROP = "hoodie.bootstrap.source.base.path"; + public static final String BOOTSTRAP_MODE_SELECTOR = "hoodie.bootstrap.mode.selector"; Review comment: Configure class which decides partitions for doing metadata/full bootstrap. ########## File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java ########## @@ -118,14 +119,17 @@ "_.hoodie.allow.multi.write.on.same.instant"; private static final String DEFAULT_ALLOW_MULTI_WRITE_ON_SAME_INSTANT = "false"; + public static final String EXTERNAL_RECORD_AND_SCHEMA_TRANSFORMATION = AVRO_SCHEMA + ".externalTransformation"; Review comment: Parquet-avro has a bug when dealing with nested fields which are lists. It looks like Parquet-avro changes the schema field name of this nested field. Schema used to write : https://gist.github.com/bvaradar/fb7af49258b8bfc232607a06d3adc8db#file-writer-schema-nested-array-schema-L70 Schema returned by Parquet-Avro : https://gist.github.com/bvaradar/c69d9af4cc7b62bf81e4cf938d0ba9ca#file-same-schema-when-read-from-parquet-L72 Using explicit record translation to new schema in Hudi gets it to work. Logic in MergeHelper.java ########## File path: hudi-client/src/main/java/org/apache/hudi/io/HoodieBootstrapHandle.java ########## @@ -0,0 +1,42 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.io; + +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.client.SparkTaskContextSupplier; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.table.HoodieTable; + +public class HoodieBootstrapHandle<T extends HoodieRecordPayload> extends HoodieCreateHandle<T> { Review comment: Used by metadata bootstrap to ensure the schema is set correctly and skeleton and external files is 1-1. ########## File path: hudi-client/src/main/java/org/apache/hudi/keygen/KeyGenUtils.java ########## @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.keygen; + +import java.io.UnsupportedEncodingException; +import java.net.URLEncoder; +import java.nio.charset.StandardCharsets; +import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieKeyException; + +public class KeyGenUtils { Review comment: Most KeyGenerator classes have boiler plate code. Have refactored to ensure they are handle them consistently. Also, I needed separate APIs to get only the record key (not partition-path) for Metadata Bootstrap. Made changes in KeyGenerator classes for that. ########## File path: hudi-cli/src/main/java/org/apache/hudi/cli/commands/StatsCommand.java ########## @@ -124,13 +125,15 @@ public String fileSizeStats( Histogram globalHistogram = new Histogram(new UniformReservoir(MAX_FILES)); HashMap<String, Histogram> commitHistoMap = new HashMap<>(); for (FileStatus fileStatus : statuses) { - String instantTime = FSUtils.getCommitTime(fileStatus.getPath().getName()); - long sz = fileStatus.getLen(); - if (!commitHistoMap.containsKey(instantTime)) { - commitHistoMap.put(instantTime, new Histogram(new UniformReservoir(MAX_FILES))); + if (!fileStatus.getPath().toString().contains(HoodieTableMetaClient.METAFOLDER_NAME)) { Review comment: CLI Tests caught this issue !! .bootstrap folder is under .hoodie which shows up in this listing ########## File path: hudi-client/src/main/java/org/apache/hudi/table/HoodieCopyOnWriteTable.java ########## @@ -155,23 +161,10 @@ public HoodieWriteMetadata compact(JavaSparkContext jsc, String compactionInstan throw new HoodieUpsertException( "Error in finding the old file path at commit " + instantTime + " for fileId: " + fileId); } else { - AvroReadSupport.setAvroReadSchema(getHadoopConf(), upsertHandle.getWriterSchema()); - BoundedInMemoryExecutor<GenericRecord, GenericRecord, Void> wrapper = null; - try (ParquetReader<IndexedRecord> reader = - AvroParquetReader.<IndexedRecord>builder(upsertHandle.getOldFilePath()).withConf(getHadoopConf()).build()) { - wrapper = new SparkBoundedInMemoryExecutor(config, new ParquetReaderIterator(reader), - new UpdateHandler(upsertHandle), x -> x); - wrapper.execute(); - } catch (Exception e) { - throw new HoodieException(e); - } finally { - upsertHandle.close(); - if (null != wrapper) { - wrapper.shutdownNow(); - } - } + MergeHelper.runMerge(this, upsertHandle); Review comment: Moved to avoid duplication in handling merge for both compaction and COW upsert ########## File path: hudi-client/src/main/java/org/apache/hudi/config/HoodieBootstrapConfig.java ########## @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.config; + +import org.apache.hudi.client.bootstrap.BootstrapMode; +import org.apache.hudi.client.bootstrap.selector.MetadataOnlyBootstrapModeSelector; +import org.apache.hudi.client.bootstrap.translator.IdentityBootstrapPathTranslator; +import org.apache.hudi.common.config.DefaultHoodieConfig; + +import java.io.File; +import java.io.FileReader; +import java.io.IOException; +import java.util.Properties; + +/** + * Bootstrap specific configs. + */ +public class HoodieBootstrapConfig extends DefaultHoodieConfig { + + public static final String SOURCE_BASE_PATH_PROP = "hoodie.bootstrap.source.base.path"; + public static final String BOOTSTRAP_MODE_SELECTOR = "hoodie.bootstrap.mode.selector"; + public static final String FULL_BOOTRAP_INPUT_PROVIDER = "hoodie.bootstrap.full.input.provider"; Review comment: Configure class which provides RDD given a list of partitions. Defaults are provided at DeltaStreamer/Spark-DataSource level. @vinothchandar : This doesnt have to be configurable but it is kept this way as a dependency injection to avoid moving SparkDataSourceBasedFullBootstrapInputProvider and all its dependencies from hudi-spark to hudi-client. Would need a discussion to come up with cleaner approach. ########## File path: hudi-cli/pom.xml ########## @@ -202,6 +237,12 @@ <type>test-jar</type> </dependency> + <dependency> Review comment: Hudi-utilities-bundle shades hbase classes which also ends up in bootstrap file itself (key class is added in footer) and hence we need to use bundle even in CLI when doing any operations involving bootstrap table. ########## File path: hudi-client/src/main/java/org/apache/hudi/table/action/bootstrap/BootstrapCommitActionExecutor.java ########## @@ -0,0 +1,532 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.table.action.bootstrap; + +import java.io.Serializable; +import java.util.HashMap; +import java.util.HashSet; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hudi.avro.HoodieAvroUtils; +import org.apache.hudi.avro.model.HoodieFileStatus; +import org.apache.hudi.client.WriteStatus; +import org.apache.hudi.client.bootstrap.MetadataBootstrapKeyGenerator; +import org.apache.hudi.client.bootstrap.BootstrapMode; +import org.apache.hudi.client.bootstrap.BootstrapRecordPayload; +import org.apache.hudi.client.bootstrap.BootstrapSourceSchemaProvider; +import org.apache.hudi.client.bootstrap.BootstrapWriteStatus; +import org.apache.hudi.client.bootstrap.FullBootstrapInputProvider; +import org.apache.hudi.client.bootstrap.selector.BootstrapModeSelector; +import org.apache.hudi.client.utils.ParquetReaderIterator; +import org.apache.hudi.common.bootstrap.FileStatusUtils; +import org.apache.hudi.common.bootstrap.index.BootstrapIndex; +import org.apache.hudi.common.config.SerializableConfiguration; +import org.apache.hudi.common.config.TypedProperties; +import org.apache.hudi.common.fs.FSUtils; +import org.apache.hudi.common.model.BootstrapSourceFileMapping; +import org.apache.hudi.common.model.HoodieBaseFile; +import org.apache.hudi.common.model.HoodieKey; +import org.apache.hudi.common.model.HoodiePartitionMetadata; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.model.HoodieRecordPayload; +import org.apache.hudi.common.model.HoodieWriteStat; +import org.apache.hudi.common.model.WriteOperationType; +import org.apache.hudi.common.table.HoodieTableMetaClient; +import org.apache.hudi.common.table.timeline.HoodieInstant; +import org.apache.hudi.common.table.timeline.HoodieInstant.State; +import org.apache.hudi.common.table.timeline.HoodieTimeline; +import org.apache.hudi.common.table.view.HoodieTableFileSystemView; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; +import org.apache.hudi.common.util.ValidationUtils; +import org.apache.hudi.common.util.collection.Pair; +import org.apache.hudi.common.util.queue.BoundedInMemoryExecutor; +import org.apache.hudi.config.HoodieWriteConfig; +import org.apache.hudi.exception.HoodieException; +import org.apache.hudi.exception.HoodieIOException; +import org.apache.hudi.exception.TableNotFoundException; +import org.apache.hudi.execution.SparkBoundedInMemoryExecutor; +import org.apache.hudi.io.HoodieBootstrapHandle; +import org.apache.hudi.table.HoodieTable; +import org.apache.hudi.table.WorkloadProfile; +import org.apache.hudi.table.action.HoodieWriteMetadata; +import org.apache.hudi.table.action.commit.BaseCommitActionExecutor; +import org.apache.hudi.table.action.commit.BulkInsertCommitActionExecutor; + +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.PathFilter; +import org.apache.hudi.table.action.commit.CommitActionExecutor; +import org.apache.log4j.LogManager; +import org.apache.log4j.Logger; +import org.apache.parquet.avro.AvroParquetReader; +import org.apache.parquet.avro.AvroReadSupport; +import org.apache.parquet.avro.AvroSchemaConverter; +import org.apache.parquet.format.converter.ParquetMetadataConverter; +import org.apache.parquet.hadoop.ParquetFileReader; +import org.apache.parquet.hadoop.ParquetReader; +import org.apache.parquet.hadoop.metadata.ParquetMetadata; +import org.apache.parquet.schema.MessageType; +import org.apache.spark.Partitioner; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.api.java.JavaSparkContext; + +import java.io.IOException; +import java.util.Collection; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class BootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>> + extends BaseCommitActionExecutor<T, HoodieBootstrapWriteMetadata> { + + private static final Logger LOG = LogManager.getLogger(BootstrapCommitActionExecutor.class); + protected String bootstrapSchema = null; + private transient FileSystem bootstrapSourceFileSystem; + + public BootstrapCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig config, HoodieTable<?> table, + Option<Map<String, String>> extraMetadata) { + super(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps()) + .withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class) + .withBulkInsertParallelism(config.getBootstrapParallelism()) + .build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, WriteOperationType.BOOTSTRAP, + extraMetadata); + bootstrapSourceFileSystem = FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf); + } + + private void checkArguments() { + ValidationUtils.checkArgument(config.getBootstrapSourceBasePath() != null, + "Ensure Bootstrap Source Path is set"); + ValidationUtils.checkArgument(config.getBootstrapModeSelectorClass() != null, + "Ensure Bootstrap Partition Selector is set"); + ValidationUtils.checkArgument(config.getBootstrapKeyGeneratorClass() != null, + "Ensure bootstrap key generator class is set"); + } + + @Override + public HoodieBootstrapWriteMetadata execute() { + checkArguments(); + try { + HoodieTableMetaClient metaClient = table.getMetaClient(); + Option<HoodieInstant> completetedInstant = + metaClient.getActiveTimeline().getCommitsTimeline().filterCompletedInstants().lastInstant(); + ValidationUtils.checkArgument(!completetedInstant.isPresent(), + "Active Timeline is expected to be empty for bootstrapped to be performed. " + + "If you want to re-bootstrap, please rollback bootstrap first !!"); + Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> partitionSelections = + listAndProcessSourcePartitions(metaClient); + + // First run metadata bootstrap which will implicitly commit + HoodieWriteMetadata metadataResult = metadataBootstrap(partitionSelections.get( + BootstrapMode.METADATA_ONLY_BOOTSTRAP)); + // if there are full bootstrap to be performed, perform that too + HoodieWriteMetadata fullBootstrapResult = + fullBootstrap(partitionSelections.get(BootstrapMode.FULL_BOOTSTRAP)); + return new HoodieBootstrapWriteMetadata(metadataResult, fullBootstrapResult); + } catch (IOException ioe) { + throw new HoodieIOException(ioe.getMessage(), ioe); + } + } + + protected String getSchemaToStoreInCommit() { + return bootstrapSchema; + } + + /** + * Perform Metadata Bootstrap. + * @param partitionFilesList List of partitions and files within that partitions + */ + protected HoodieWriteMetadata metadataBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) { + if (null == partitionFilesList || partitionFilesList.isEmpty()) { + return null; + } + + HoodieTableMetaClient metaClient = table.getMetaClient(); + metaClient.getActiveTimeline().createNewInstant( + new HoodieInstant(State.REQUESTED, metaClient.getCommitActionType(), + HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS)); + + table.getActiveTimeline().transitionRequestedToInflight(new HoodieInstant(State.REQUESTED, + metaClient.getCommitActionType(), HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS), Option.empty()); + + JavaRDD<BootstrapWriteStatus> bootstrapWriteStatuses = runMetadataBootstrap(partitionFilesList); + + HoodieWriteMetadata result = new HoodieWriteMetadata(); + updateIndexAndCommitIfNeeded(bootstrapWriteStatuses.map(w -> w), result); + return result; + } + + @Override + protected void commit(Option<Map<String, String>> extraMetadata, HoodieWriteMetadata result) { + // Perform bootstrap index write and then commit. Make sure both record-key and bootstrap-index + // is all done in a single job DAG. + Map<String, List<Pair<BootstrapSourceFileMapping, HoodieWriteStat>>> bootstrapSourceAndStats = + result.getWriteStatuses().collect().stream() + .map(w -> { + BootstrapWriteStatus ws = (BootstrapWriteStatus) w; + return Pair.of(ws.getBootstrapSourceFileMapping(), ws.getStat()); + }).collect(Collectors.groupingBy(w -> w.getKey().getHudiPartitionPath())); + HoodieTableMetaClient metaClient = table.getMetaClient(); + try (BootstrapIndex.IndexWriter indexWriter = BootstrapIndex.getBootstrapIndex(metaClient) + .createWriter(config.getBootstrapSourceBasePath())) { + LOG.info("Starting to write bootstrap index for source " + config.getBootstrapSourceBasePath() + " in table " + + config.getBasePath()); + indexWriter.begin(); + bootstrapSourceAndStats.forEach((key, value) -> indexWriter.appendNextPartition(key, + value.stream().map(w -> w.getKey()).collect(Collectors.toList()))); + indexWriter.finish(); + LOG.info("Finished writing bootstrap index for source " + config.getBootstrapSourceBasePath() + " in table " + + config.getBasePath()); + } + + super.commit(extraMetadata, result, bootstrapSourceAndStats.values().stream() + .flatMap(f -> f.stream().map(Pair::getValue)).collect(Collectors.toList())); + LOG.info("Done Committing metadata bootstrap !!"); + } + + /** + * Perform Metadata Bootstrap. + * @param partitionFilesList List of partitions and files within that partitions + */ + protected HoodieWriteMetadata fullBootstrap(List<Pair<String, List<HoodieFileStatus>>> partitionFilesList) { + if (null == partitionFilesList || partitionFilesList.isEmpty()) { + return null; + } + TypedProperties properties = new TypedProperties(); + properties.putAll(config.getProps()); + FullBootstrapInputProvider inputProvider = + (FullBootstrapInputProvider) ReflectionUtils.loadClass(config.getFullBootstrapInputProvider(), + properties, jsc); + JavaRDD<HoodieRecord> inputRecordsRDD = + inputProvider.generateInputRecordRDD("bootstrap_source", config.getBootstrapSourceBasePath(), + partitionFilesList); + // Start Full Bootstrap + final HoodieInstant requested = new HoodieInstant(State.REQUESTED, table.getMetaClient().getCommitActionType(), + HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS); + table.getActiveTimeline().createNewInstant(requested); + // Setup correct schema and run bulk insert. + return getBulkInsertActionExecutor(inputRecordsRDD).execute(); + } + + protected CommitActionExecutor<T> getBulkInsertActionExecutor(JavaRDD<HoodieRecord> inputRecordsRDD) { + return new BulkInsertCommitActionExecutor(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps()) + .withSchema(bootstrapSchema).build(), table, HoodieTimeline.FULL_BOOTSTRAP_INSTANT_TS, + inputRecordsRDD, extraMetadata); + } + + private BootstrapWriteStatus handleMetadataBootstrap(String srcPartitionPath, String partitionPath, + HoodieFileStatus srcFileStatus, MetadataBootstrapKeyGenerator keyGenerator) { + + Path sourceFilePath = FileStatusUtils.toPath(srcFileStatus.getPath()); + HoodieBootstrapHandle bootstrapHandle = new HoodieBootstrapHandle(config, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, + table, partitionPath, FSUtils.createNewFileIdPfx(), table.getSparkTaskContextSupplier()); + Schema avroSchema = null; + try { + ParquetMetadata readFooter = ParquetFileReader.readFooter(table.getHadoopConf(), sourceFilePath, + ParquetMetadataConverter.NO_FILTER); + MessageType parquetSchema = readFooter.getFileMetaData().getSchema(); + avroSchema = new AvroSchemaConverter().convert(parquetSchema); + Schema recordKeySchema = HoodieAvroUtils.generateProjectionSchema(avroSchema, + keyGenerator.getTopLevelRecordKeyFields()); + LOG.info("Schema to be used for reading record Keys :" + recordKeySchema); + AvroReadSupport.setAvroReadSchema(table.getHadoopConf(), recordKeySchema); + AvroReadSupport.setRequestedProjection(table.getHadoopConf(), recordKeySchema); + + BoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void> wrapper = null; + try (ParquetReader<IndexedRecord> reader = + AvroParquetReader.<IndexedRecord>builder(sourceFilePath).withConf(table.getHadoopConf()).build()) { + wrapper = new SparkBoundedInMemoryExecutor<GenericRecord, HoodieRecord, Void>(config, + new ParquetReaderIterator(reader), new BootstrapRecordWriter(bootstrapHandle), inp -> { + String recKey = keyGenerator.getRecordKey(inp); + GenericRecord gr = new GenericData.Record(HoodieAvroUtils.RECORD_KEY_SCHEMA); + gr.put(HoodieRecord.RECORD_KEY_METADATA_FIELD, recKey); + BootstrapRecordPayload payload = new BootstrapRecordPayload(gr); + HoodieRecord rec = new HoodieRecord(new HoodieKey(recKey, partitionPath), payload); + return rec; + }); + wrapper.execute(); + } catch (Exception e) { + throw new HoodieException(e); + } finally { + bootstrapHandle.close(); + if (null != wrapper) { + wrapper.shutdownNow(); + } + } + } catch (IOException e) { + throw new HoodieIOException(e.getMessage(), e); + } + BootstrapWriteStatus writeStatus = (BootstrapWriteStatus)bootstrapHandle.getWriteStatus(); + BootstrapSourceFileMapping bootstrapSourceFileMapping = new BootstrapSourceFileMapping( + config.getBootstrapSourceBasePath(), srcPartitionPath, partitionPath, + srcFileStatus, writeStatus.getFileId()); + writeStatus.setBootstrapSourceFileMapping(bootstrapSourceFileMapping); + return writeStatus; + } + + /** + * Return Bootstrap Mode selections for partitions listed and figure out bootstrap Schema. + * @param metaClient Hoodie Table Meta Client. + * @return + * @throws IOException + */ + private Map<BootstrapMode, List<Pair<String, List<HoodieFileStatus>>>> listAndProcessSourcePartitions( + HoodieTableMetaClient metaClient) throws IOException { + //TODO: Added HoodieFilter for manually testing bootstrap from source hudi table. Needs to be reverted. + final PathFilter hoodieFilter = new HoodieROTablePathFilter(); + List<Pair<String, List<HoodieFileStatus>>> folders = Review comment: HUDI-999 to track parallelizing this step. ########## File path: hudi-common/src/main/java/org/apache/hudi/common/table/timeline/dto/FileStatusDTO.java ########## @@ -68,7 +68,7 @@ public static FileStatusDTO fromFileStatus(FileStatus fileStatus) { dto.blockReplication = fileStatus.getReplication(); dto.blocksize = fileStatus.getBlockSize(); dto.modificationTime = fileStatus.getModificationTime(); - dto.accessTime = fileStatus.getModificationTime(); + dto.accessTime = fileStatus.getAccessTime(); Review comment: FYI : this is a bug but doesn't have any impact currently. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org