vinothchandar commented on a change in pull request #1827:
URL: https://github.com/apache/hudi/pull/1827#discussion_r484598559



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/AbstractHoodieWriteClient.java
##########
@@ -716,32 +674,97 @@ private void rollbackPendingCommits() {
    * @param compactionInstantTime Compaction Instant Time
    * @return RDD of Write Status
    */
-  private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean 
shouldComplete) {
-    HoodieTable<T> table = HoodieTable.create(config, hadoopConf);
-    HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
-    HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
-    if (pendingCompactionTimeline.containsInstant(inflightInstant)) {
-      rollbackInflightCompaction(inflightInstant, table);
-      table.getMetaClient().reloadActiveTimeline();
-    }
-    compactionTimer = metrics.getCompactionCtx();
-    HoodieWriteMetadata compactionMetadata = table.compact(jsc, 
compactionInstantTime);
-    JavaRDD<WriteStatus> statuses = compactionMetadata.getWriteStatuses();
-    if (shouldComplete && compactionMetadata.getCommitMetadata().isPresent()) {
-      completeCompaction(compactionMetadata.getCommitMetadata().get(), 
statuses, table, compactionInstantTime);
-    }
-    return statuses;
-  }
+  protected abstract O compact(String compactionInstantTime, boolean 
shouldComplete);
 
   /**
    * Performs a compaction operation on a table, serially before or after an 
insert/upsert action.
    */
-  private Option<String> inlineCompact(Option<Map<String, String>> 
extraMetadata) {
+  protected Option<String> inlineCompact(Option<Map<String, String>> 
extraMetadata) {
     Option<String> compactionInstantTimeOpt = 
scheduleCompaction(extraMetadata);
     compactionInstantTimeOpt.ifPresent(compactionInstantTime -> {
       // inline compaction should auto commit as the user is never given 
control
       compact(compactionInstantTime, true);
     });
     return compactionInstantTimeOpt;
   }
+
+  /**
+   * Finalize Write operation.
+   *
+   * @param table       HoodieTable
+   * @param instantTime Instant Time
+   * @param stats       Hoodie Write Stat
+   */
+  protected void finalizeWrite(HoodieTable<T, I, K, O, P> table, String 
instantTime, List<HoodieWriteStat> stats) {
+    try {
+      final Timer.Context finalizeCtx = metrics.getFinalizeCtx();
+      table.finalizeWrite(context, instantTime, stats);
+      if (finalizeCtx != null) {
+        Option<Long> durationInMs = 
Option.of(metrics.getDurationInMs(finalizeCtx.stop()));
+        durationInMs.ifPresent(duration -> {
+          LOG.info("Finalize write elapsed time (milliseconds): " + duration);
+          metrics.updateFinalizeWriteMetrics(duration, stats.size());
+        });
+      }
+    } catch (HoodieIOException ioe) {
+      throw new HoodieCommitException("Failed to complete commit " + 
instantTime + " due to finalize errors.", ioe);
+    }
+  }
+
+  public HoodieMetrics getMetrics() {
+    return metrics;
+  }
+
+  public HoodieIndex<T, I, K, O, P> getIndex() {
+    return index;
+  }
+
+  /**
+   * Get HoodieTable and init {@link Timer.Context}.
+   *
+   * @param operationType write operation type
+   * @param instantTime   current inflight instant time
+   * @return HoodieTable
+   */
+  protected abstract HoodieTable<T, I, K, O, P> 
getTableAndInitCtx(WriteOperationType operationType, String instantTime);
+
+  /**
+   * Sets write schema from last instant since deletes may not have schema set 
in the config.
+   */
+  protected void setWriteSchemaForDeletes(HoodieTableMetaClient metaClient) {
+    try {
+      HoodieActiveTimeline activeTimeline = metaClient.getActiveTimeline();
+      Option<HoodieInstant> lastInstant =
+          activeTimeline.filterCompletedInstants().filter(s -> 
s.getAction().equals(metaClient.getCommitActionType()))
+              .lastInstant();
+      if (lastInstant.isPresent()) {
+        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
+            activeTimeline.getInstantDetails(lastInstant.get()).get(), 
HoodieCommitMetadata.class);
+        if 
(commitMetadata.getExtraMetadata().containsKey(HoodieCommitMetadata.SCHEMA_KEY))
 {
+          
config.setSchema(commitMetadata.getExtraMetadata().get(HoodieCommitMetadata.SCHEMA_KEY));
+        } else {
+          throw new HoodieIOException("Latest commit does not have any schema 
in commit metadata");
+        }
+      } else {
+        throw new HoodieIOException("Deletes issued without any prior 
commits");
+      }
+    } catch (IOException e) {
+      throw new HoodieIOException("IOException thrown while reading last 
commit metadata", e);
+    }
+  }
+
+  public abstract AsyncCleanerService 
startAsyncCleaningIfEnabled(AbstractHoodieWriteClient<T, I, K, O, P> client, 
String instantTime);
+
+  @Override
+  public void close() {

Review comment:
       need to ensure the ordering of closing resources is the same as before/

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkWriteClient.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.client.embedded.SparkEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieSparkIndexFactory;
+import org.apache.hudi.table.BaseHoodieTimelineArchiveLog;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieSparkTimelineArchiveLog;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.SparkMarkerFiles;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.SparkCompactHelpers;
+import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+
+public class HoodieSparkWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T,

Review comment:
       Let's name this `SparkRDDWriteClient` ?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkWriteClient.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.client.embedded.SparkEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieSparkIndexFactory;
+import org.apache.hudi.table.BaseHoodieTimelineArchiveLog;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieSparkTimelineArchiveLog;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.SparkMarkerFiles;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.SparkCompactHelpers;
+import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+
+public class HoodieSparkWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T,
+    JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, 
JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkWriteClient.class);
+
+  public HoodieSparkWriteClient(HoodieEngineContext context, HoodieWriteConfig 
clientConfig) {
+    super(context, clientConfig);
+  }
+
+  public HoodieSparkWriteClient(HoodieEngineContext context, HoodieWriteConfig 
writeConfig, boolean rollbackPending) {
+    super(context, writeConfig, rollbackPending);
+  }
+
+  public HoodieSparkWriteClient(HoodieEngineContext context, HoodieWriteConfig 
writeConfig, boolean rollbackPending, Option<BaseEmbeddedTimelineService> 
timelineService) {
+    super(context, writeConfig, rollbackPending, timelineService);
+  }
+
+  /**
+   * Register hudi classes for Kryo serialization.
+   *
+   * @param conf instance of SparkConf
+   * @return SparkConf
+   */
+  public static SparkConf registerClasses(SparkConf conf) {
+    conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, 
HoodieRecord.class, HoodieKey.class});
+    return conf;
+  }
+
+  @Override
+  protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> 
createIndex(HoodieWriteConfig writeConfig) {
+    return HoodieSparkIndexFactory.createIndex(config);
+  }
+
+  @Override
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata) {
+    List<HoodieWriteStat> stats = 
writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStats(instantTime, stats, extraMetadata);
+  }
+
+  @Override
+  protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> 
createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+    return HoodieSparkTable.create(config, context);
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> 
hoodieRecords) {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieTable table = HoodieSparkTable.create(config, context);
+    Timer.Context indexTimer = metrics.getIndexCtx();
+    JavaRDD<HoodieRecord<T>> recordsWithLocation = 
getIndex().tagLocation(hoodieRecords, context, table);
+    metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer 
== null ? 0L : indexTimer.stop()));
+    return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
+  }
+
+  /**
+   * Main API to run bootstrap to hudi.
+   */
+  @Override
+  public void bootstrap(Option<Map<String, String>> extraMetadata) {
+    if (rollbackPending) {
+      rollBackInflightBootstrap();
+    }
+    HoodieSparkTable table = (HoodieSparkTable) 
getTableAndInitCtx(WriteOperationType.UPSERT, 
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
+    table.bootstrap(context, extraMetadata);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String 
instantTime) {
+    HoodieSparkTable table = (HoodieSparkTable) 
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
+    table.validateUpsertSchema();
+    setOperationType(WriteOperationType.UPSERT);
+    startAsyncCleaningIfEnabled(this, instantTime);

Review comment:
       why are we not hanging onto the returned object? 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/action/bootstrap/SparkBootstrapCommitActionExecutor.java
##########
@@ -77,34 +81,44 @@
 import org.apache.parquet.hadoop.ParquetReader;
 import org.apache.parquet.hadoop.metadata.ParquetMetadata;
 import org.apache.parquet.schema.MessageType;
-import org.apache.spark.Partitioner;
+import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 
 import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.time.Duration;
+import java.time.Instant;
 import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-public class BootstrapCommitActionExecutor<T extends HoodieRecordPayload<T>>
-    extends BaseCommitActionExecutor<T, HoodieBootstrapWriteMetadata> {
+public class SparkBootstrapCommitActionExecutor<T extends HoodieRecordPayload>
+    extends BaseCommitActionExecutor<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, 
Option<Pair<String, String>>>, HoodieBootstrapWriteMetadata> {
 
-  private static final Logger LOG = 
LogManager.getLogger(BootstrapCommitActionExecutor.class);
+  private static final Logger LOG = 
LogManager.getLogger(SparkBootstrapCommitActionExecutor.class);
   protected String bootstrapSchema = null;
   private transient FileSystem bootstrapSourceFileSystem;
 
-  public BootstrapCommitActionExecutor(JavaSparkContext jsc, HoodieWriteConfig 
config, HoodieTable<?> table,
-      Option<Map<String, String>> extraMetadata) {
-    super(jsc, new HoodieWriteConfig.Builder().withProps(config.getProps())
-        .withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
-        .withBulkInsertParallelism(config.getBootstrapParallelism())
-        .build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
WriteOperationType.BOOTSTRAP,
+  public SparkBootstrapCommitActionExecutor(HoodieSparkEngineContext context,
+                                            HoodieWriteConfig config,
+                                            HoodieTable<T, 
JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, 
JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> table,
+                                            Option<Map<String, String>> 
extraMetadata) {
+    super(context, new HoodieWriteConfig.Builder().withProps(config.getProps())
+            
.withAutoCommit(true).withWriteStatusClass(BootstrapWriteStatus.class)
+            .withBulkInsertParallelism(config.getBootstrapParallelism())
+            .build(), table, HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS, 
WriteOperationType.BOOTSTRAP,
         extraMetadata);
     bootstrapSourceFileSystem = 
FSUtils.getFs(config.getBootstrapSourceBasePath(), hadoopConf);
   }
 
+  @Override
+  public HoodieWriteMetadata<JavaRDD<WriteStatus>> 
execute(JavaRDD<HoodieRecord<T>> inputRecordsRDD) {

Review comment:
       hmmm? why do we return null here

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/HoodieSparkWriteClient.java
##########
@@ -0,0 +1,360 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client;
+
+import com.codahale.metrics.Timer;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.client.embedded.SparkEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.model.HoodieCommitMetadata;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieWriteStat;
+import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.table.timeline.HoodieActiveTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieCommitException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.index.HoodieIndex;
+import org.apache.hudi.index.HoodieSparkIndexFactory;
+import org.apache.hudi.table.BaseHoodieTimelineArchiveLog;
+import org.apache.hudi.table.BulkInsertPartitioner;
+import org.apache.hudi.table.HoodieSparkTable;
+import org.apache.hudi.table.HoodieSparkTimelineArchiveLog;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.hudi.table.SparkMarkerFiles;
+import org.apache.hudi.table.action.HoodieWriteMetadata;
+import org.apache.hudi.table.action.compact.SparkCompactHelpers;
+import org.apache.hudi.table.upgrade.SparkUpgradeDowngrade;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+import java.text.ParseException;
+import java.util.List;
+import java.util.Map;
+
+public class HoodieSparkWriteClient<T extends HoodieRecordPayload> extends 
AbstractHoodieWriteClient<T,
+    JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, JavaRDD<WriteStatus>, 
JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkWriteClient.class);
+
+  public HoodieSparkWriteClient(HoodieEngineContext context, HoodieWriteConfig 
clientConfig) {
+    super(context, clientConfig);
+  }
+
+  public HoodieSparkWriteClient(HoodieEngineContext context, HoodieWriteConfig 
writeConfig, boolean rollbackPending) {
+    super(context, writeConfig, rollbackPending);
+  }
+
+  public HoodieSparkWriteClient(HoodieEngineContext context, HoodieWriteConfig 
writeConfig, boolean rollbackPending, Option<BaseEmbeddedTimelineService> 
timelineService) {
+    super(context, writeConfig, rollbackPending, timelineService);
+  }
+
+  /**
+   * Register hudi classes for Kryo serialization.
+   *
+   * @param conf instance of SparkConf
+   * @return SparkConf
+   */
+  public static SparkConf registerClasses(SparkConf conf) {
+    conf.registerKryoClasses(new Class[]{HoodieWriteConfig.class, 
HoodieRecord.class, HoodieKey.class});
+    return conf;
+  }
+
+  @Override
+  protected HoodieIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> 
createIndex(HoodieWriteConfig writeConfig) {
+    return HoodieSparkIndexFactory.createIndex(config);
+  }
+
+  @Override
+  public boolean commit(String instantTime, JavaRDD<WriteStatus> 
writeStatuses, Option<Map<String, String>> extraMetadata) {
+    List<HoodieWriteStat> stats = 
writeStatuses.map(WriteStatus::getStat).collect();
+    return commitStats(instantTime, stats, extraMetadata);
+  }
+
+  @Override
+  protected HoodieTable<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> 
createTable(HoodieWriteConfig config, Configuration hadoopConf) {
+    return HoodieSparkTable.create(config, context);
+  }
+
+  @Override
+  public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> 
hoodieRecords) {
+    // Create a Hoodie table which encapsulated the commits and files visible
+    HoodieTable table = HoodieSparkTable.create(config, context);
+    Timer.Context indexTimer = metrics.getIndexCtx();
+    JavaRDD<HoodieRecord<T>> recordsWithLocation = 
getIndex().tagLocation(hoodieRecords, context, table);
+    metrics.updateIndexMetrics(LOOKUP_STR, metrics.getDurationInMs(indexTimer 
== null ? 0L : indexTimer.stop()));
+    return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
+  }
+
+  /**
+   * Main API to run bootstrap to hudi.
+   */
+  @Override
+  public void bootstrap(Option<Map<String, String>> extraMetadata) {
+    if (rollbackPending) {
+      rollBackInflightBootstrap();
+    }
+    HoodieSparkTable table = (HoodieSparkTable) 
getTableAndInitCtx(WriteOperationType.UPSERT, 
HoodieTimeline.METADATA_BOOTSTRAP_INSTANT_TS);
+    table.bootstrap(context, extraMetadata);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> upsert(JavaRDD<HoodieRecord<T>> records, String 
instantTime) {
+    HoodieSparkTable table = (HoodieSparkTable) 
getTableAndInitCtx(WriteOperationType.UPSERT, instantTime);
+    table.validateUpsertSchema();
+    setOperationType(WriteOperationType.UPSERT);
+    startAsyncCleaningIfEnabled(this, instantTime);
+    HoodieWriteMetadata<JavaRDD<WriteStatus>> result = table.upsert(context, 
instantTime, records);
+    if (result.getIndexLookupDuration().isPresent()) {
+      metrics.updateIndexMetrics(LOOKUP_STR, 
result.getIndexLookupDuration().get().toMillis());
+    }
+    return postWrite(result, instantTime, table);
+  }
+
+  @Override
+  public JavaRDD<WriteStatus> upsertPreppedRecords(JavaRDD<HoodieRecord<T>> 
preppedRecords, String instantTime) {
+    HoodieSparkTable table = (HoodieSparkTable) 
getTableAndInitCtx(WriteOperationType.UPSERT_PREPPED, instantTime);
+    table.validateUpsertSchema();
+    setOperationType(WriteOperationType.UPSERT_PREPPED);
+    startAsyncCleaningIfEnabled(this, instantTime);

Review comment:
       same here and everywhere else. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/embedded/SparkEmbeddedTimelineService.java
##########
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.client.embedded;
+
+import org.apache.hudi.client.embebbed.BaseEmbeddedTimelineService;
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.HoodieSparkEngineContext;
+import org.apache.hudi.common.table.view.FileSystemViewStorageConfig;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.SparkConf;
+
+/**
+ * Spark implementation of Timeline Service.
+ */
+public class SparkEmbeddedTimelineService extends BaseEmbeddedTimelineService {
+
+  private static final Logger LOG = 
LogManager.getLogger(SparkEmbeddedTimelineService.class);
+
+  public SparkEmbeddedTimelineService(HoodieEngineContext context, 
FileSystemViewStorageConfig config) {
+    super(context, config);
+  }
+
+  @Override
+  public void setHostAddrFromContext(HoodieEngineContext context) {
+    SparkConf sparkConf = 
HoodieSparkEngineContext.getSparkContext(context).getConf();
+    String hostAddr = sparkConf.get("spark.driver.host", null);

Review comment:
       I think we can eliminate the need for breaking this up into spark vs 
non-spark, by just passing in the host. This class does not make much sense 
being broken up. 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkMergeHandle.java
##########
@@ -54,9 +60,9 @@
 import java.util.Set;
 
 @SuppressWarnings("Duplicates")
-public class HoodieMergeHandle<T extends HoodieRecordPayload> extends 
HoodieWriteHandle<T> {
+public class HoodieSparkMergeHandle<T extends HoodieRecordPayload> extends 
HoodieWriteHandle<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> {

Review comment:
       at the MergeHandle level, we need not introduce any notion of RDDs. the 
`io` package should be free of spark already. All we need to do is to pass in 
the taskContextSupplier correctly? This is a large outstanding issue we need to 
resolve 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/simple/HoodieSparkGlobalSimpleIndex.java
##########
@@ -71,43 +75,14 @@ public HoodieGlobalSimpleIndex(HoodieWriteConfig config) {
    * @return {@link JavaRDD} of records with record locations set
    */
   protected JavaRDD<HoodieRecord<T>> 
tagLocationInternal(JavaRDD<HoodieRecord<T>> inputRecordRDD, JavaSparkContext 
jsc,
-                                                         HoodieTable<T> 
hoodieTable) {
+                                                         HoodieTable 
hoodieTable) {
 
     JavaPairRDD<String, HoodieRecord<T>> keyedInputRecordRDD = 
inputRecordRDD.mapToPair(entry -> new Tuple2<>(entry.getRecordKey(), entry));
     JavaPairRDD<HoodieKey, HoodieRecordLocation> allRecordLocationsInTable = 
fetchAllRecordLocations(jsc, hoodieTable,
         config.getGlobalSimpleIndexParallelism());
     return getTaggedRecords(keyedInputRecordRDD, allRecordLocationsInTable);
   }
 
-  /**
-   * Fetch record locations for passed in {@link HoodieKey}s.
-   *
-   * @param jsc         instance of {@link JavaSparkContext} to use
-   * @param hoodieTable instance of {@link HoodieTable} of interest
-   * @param parallelism parallelism to use
-   * @return {@link JavaPairRDD} of {@link HoodieKey} and {@link 
HoodieRecordLocation}
-   */
-  protected JavaPairRDD<HoodieKey, HoodieRecordLocation> 
fetchAllRecordLocations(JavaSparkContext jsc,
-                                                                               
  HoodieTable hoodieTable,
-                                                                               
  int parallelism) {
-    List<Pair<String, HoodieBaseFile>> latestBaseFiles = 
getAllBaseFilesInTable(jsc, hoodieTable);
-    return fetchRecordLocations(jsc, hoodieTable, parallelism, 
latestBaseFiles);
-  }
-
-  /**
-   * Load all files for all partitions as <Partition, filename> pair RDD.
-   */
-  protected List<Pair<String, HoodieBaseFile>> getAllBaseFilesInTable(final 
JavaSparkContext jsc, final HoodieTable hoodieTable) {

Review comment:
       note to self: make sure these methods are in the base class now 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/index/hbase/HoodieSparkHBaseIndex.java
##########
@@ -18,169 +18,60 @@
 
 package org.apache.hudi.index.hbase;
 
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.BufferedMutator;
+import org.apache.hadoop.hbase.client.Delete;
+import org.apache.hadoop.hbase.client.Get;
+import org.apache.hadoop.hbase.client.HTable;
+import org.apache.hadoop.hbase.client.Mutation;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hudi.client.WriteStatus;
 import org.apache.hudi.client.utils.SparkConfigUtils;
+import org.apache.hudi.common.HoodieEngineContext;
+import org.apache.hudi.common.HoodieSparkEngineContext;
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
-import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ReflectionUtils;
 import org.apache.hudi.common.util.collection.Pair;
-import org.apache.hudi.config.HoodieHBaseIndexConfig;
 import org.apache.hudi.config.HoodieWriteConfig;
-import org.apache.hudi.exception.HoodieDependentSystemUnavailableException;
 import org.apache.hudi.exception.HoodieIndexException;
-import org.apache.hudi.index.HoodieIndex;
 import org.apache.hudi.table.HoodieTable;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.hbase.HBaseConfiguration;
-import org.apache.hadoop.hbase.HRegionLocation;
-import org.apache.hadoop.hbase.TableName;
-import org.apache.hadoop.hbase.client.BufferedMutator;
-import org.apache.hadoop.hbase.client.Connection;
-import org.apache.hadoop.hbase.client.ConnectionFactory;
-import org.apache.hadoop.hbase.client.Delete;
-import org.apache.hadoop.hbase.client.Get;
-import org.apache.hadoop.hbase.client.HTable;
-import org.apache.hadoop.hbase.client.Mutation;
-import org.apache.hadoop.hbase.client.Put;
-import org.apache.hadoop.hbase.client.RegionLocator;
-import org.apache.hadoop.hbase.client.Result;
-import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.SparkConf;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function2;
+import scala.Tuple2;
 
 import java.io.IOException;
-import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 
-import scala.Tuple2;
+public class HoodieSparkHBaseIndex<T extends HoodieRecordPayload> extends 
BaseHoodieHBaseIndex<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> {
 
-/**
- * Hoodie Index implementation backed by HBase.
- */
-public class HBaseIndex<T extends HoodieRecordPayload> extends HoodieIndex<T> {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSparkHBaseIndex.class);
 
   public static final String DEFAULT_SPARK_EXECUTOR_INSTANCES_CONFIG_NAME = 
"spark.executor.instances";
   public static final String 
DEFAULT_SPARK_DYNAMIC_ALLOCATION_ENABLED_CONFIG_NAME = 
"spark.dynamicAllocation.enabled";
   public static final String 
DEFAULT_SPARK_DYNAMIC_ALLOCATION_MAX_EXECUTORS_CONFIG_NAME =
       "spark.dynamicAllocation.maxExecutors";
 
-  private static final byte[] SYSTEM_COLUMN_FAMILY = Bytes.toBytes("_s");
-  private static final byte[] COMMIT_TS_COLUMN = Bytes.toBytes("commit_ts");
-  private static final byte[] FILE_NAME_COLUMN = Bytes.toBytes("file_name");
-  private static final byte[] PARTITION_PATH_COLUMN = 
Bytes.toBytes("partition_path");
-  private static final int SLEEP_TIME_MILLISECONDS = 100;
-
-  private static final Logger LOG = LogManager.getLogger(HBaseIndex.class);
-  private static Connection hbaseConnection = null;
-  private HBaseIndexQPSResourceAllocator hBaseIndexQPSResourceAllocator = null;
-  private float qpsFraction;
-  private int maxQpsPerRegionServer;
-  /**
-   * multiPutBatchSize will be computed and re-set in updateLocation if
-   * {@link HoodieHBaseIndexConfig#HBASE_PUT_BATCH_SIZE_AUTO_COMPUTE_PROP} is 
set to true.
-   */
-  private Integer multiPutBatchSize;
-  private Integer numRegionServersForTable;
-  private final String tableName;
-  private HBasePutBatchSizeCalculator putBatchSizeCalculator;
-
-  public HBaseIndex(HoodieWriteConfig config) {
+  public HoodieSparkHBaseIndex(HoodieWriteConfig config) {
     super(config);
-    this.tableName = config.getHbaseTableName();
-    addShutDownHook();
-    init(config);
-  }
-
-  private void init(HoodieWriteConfig config) {
-    this.multiPutBatchSize = config.getHbaseIndexGetBatchSize();
-    this.qpsFraction = config.getHbaseIndexQPSFraction();
-    this.maxQpsPerRegionServer = config.getHbaseIndexMaxQPSPerRegionServer();
-    this.putBatchSizeCalculator = new HBasePutBatchSizeCalculator();
-    this.hBaseIndexQPSResourceAllocator = 
createQPSResourceAllocator(this.config);
-  }
-
-  public HBaseIndexQPSResourceAllocator 
createQPSResourceAllocator(HoodieWriteConfig config) {

Review comment:
       note to self: make sure these methods are now in the base class

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/SparkCreateHandleFactory.java
##########
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.TaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+public class SparkCreateHandleFactory<T extends HoodieRecordPayload> extends 
WriteHandleFactory<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> {
+
+  @Override
+  public HoodieSparkCreateHandle create(final HoodieWriteConfig hoodieConfig,

Review comment:
       same. is there a way to not make these spark specific

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/SparkAppendHandleFactory.java
##########
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.TaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
+import org.apache.hudi.config.HoodieWriteConfig;
+
+import org.apache.hudi.table.HoodieTable;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+
+/**
+ * Factory to create {@link HoodieSparkAppendHandle}.
+ */
+public class SparkAppendHandleFactory<T extends HoodieRecordPayload> extends 
WriteHandleFactory<T, JavaRDD<HoodieRecord<T>>, JavaRDD<HoodieKey>, 
JavaRDD<WriteStatus>, JavaPairRDD<HoodieKey, Option<Pair<String, String>>>> {

Review comment:
       same here. we need to make sure these factory methods don't have spark 
vs non-spark versions

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/HoodieSparkMergeHandle.java
##########
@@ -71,34 +77,25 @@
   protected boolean useWriterSchema;
   private HoodieBaseFile baseFileToMerge;
 
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
-       Iterator<HoodieRecord<T>> recordItr, String partitionPath, String 
fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
sparkTaskContextSupplier);
+  public HoodieSparkMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable,
+                                Iterator<HoodieRecord<T>> recordItr, String 
partitionPath, String fileId, TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
     init(fileId, recordItr);
     init(fileId, partitionPath, 
hoodieTable.getBaseFileOnlyView().getLatestBaseFile(partitionPath, 
fileId).get());
   }
 
   /**
    * Called by compactor code path.
    */
-  public HoodieMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
-      Map<String, HoodieRecord<T>> keyToNewRecords, String partitionPath, 
String fileId,
-      HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier 
sparkTaskContextSupplier) {
-    super(config, instantTime, partitionPath, fileId, hoodieTable, 
sparkTaskContextSupplier);
+  public HoodieSparkMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable hoodieTable,
+                                Map<String, HoodieRecord<T>> keyToNewRecords, 
String partitionPath, String fileId,
+                                HoodieBaseFile dataFileToBeMerged, 
TaskContextSupplier taskContextSupplier) {
+    super(config, instantTime, partitionPath, fileId, hoodieTable, 
taskContextSupplier);
     this.keyToNewRecords = keyToNewRecords;
     this.useWriterSchema = true;
     init(fileId, this.partitionPath, dataFileToBeMerged);
   }
 
-  @Override

Review comment:
       please refrain from moving methods around within the file. it makes life 
hard during review :( 

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/SparkWorkloadProfile.java
##########
@@ -22,49 +22,22 @@
 import org.apache.hudi.common.model.HoodieRecordLocation;
 import org.apache.hudi.common.model.HoodieRecordPayload;
 import org.apache.hudi.common.util.Option;
-
 import org.apache.spark.api.java.JavaRDD;
+import scala.Tuple2;
 
-import java.io.Serializable;
-import java.util.HashMap;
 import java.util.Map;
-import java.util.Set;
-
-import scala.Tuple2;
 
 /**
- * Information about incoming records for upsert/insert obtained either via 
sampling or introspecting the data fully.
- * <p>
- * TODO(vc): Think about obtaining this directly from index.tagLocation
+ * Spark implementation of {@link BaseWorkloadProfile}.
+ * @param <T>
  */
-public class WorkloadProfile<T extends HoodieRecordPayload> implements 
Serializable {
-
-  /**
-   * Input workload.
-   */
-  private final JavaRDD<HoodieRecord<T>> taggedRecords;
-
-  /**
-   * Computed workload profile.
-   */
-  private final HashMap<String, WorkloadStat> partitionPathStatMap;
-
-  /**
-   * Global workloadStat.
-   */
-  private final WorkloadStat globalStat;
-
-  public WorkloadProfile(JavaRDD<HoodieRecord<T>> taggedRecords) {
-    this.taggedRecords = taggedRecords;
-    this.partitionPathStatMap = new HashMap<>();
-    this.globalStat = new WorkloadStat();
-    buildProfile();
+public class SparkWorkloadProfile<T extends HoodieRecordPayload> extends 
BaseWorkloadProfile<JavaRDD<HoodieRecord<T>>> {

Review comment:
       we can actually try and keep this generic and just pass in what we need 
from `taggedRecords` to constructor instead of the entire thing 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to