xushiyan commented on a change in pull request #2710:
URL: https://github.com/apache/hudi/pull/2710#discussion_r615463905



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Error Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getErrorTableInsertParallelism();
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+                                            
.withEmbeddedTimelineServerEnabled(false)
+                                            .withPath(basePath)
+                                            
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
+                                            
.forTable(getErrorTableName(writeConfig))
+                                            
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                                                                      
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                                                                      
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
+                                                                      
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
+                                                                          
writeConfig.getMetadataMaxCommitsToKeep()).build())
+                                            .withParallelism(parallelism, 
parallelism);
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {

Review comment:
       ```suggestion
     public HoodieWriteConfig getErrorTableWriteConfig() {
   ```
   given there are 2 write configs, you want to be specific on which one to get.

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieROTablePathFilter.java
##########
@@ -148,8 +148,9 @@ public boolean accept(Path path) {
 
       // Skip all files that are descendants of .hoodie in its path.
       String filePath = path.toString();
-      if (filePath.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/")
-          || filePath.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME)) {
+      if ((filePath.contains("/" + HoodieTableMetaClient.METAFOLDER_NAME + "/")
+          || filePath.endsWith("/" + HoodieTableMetaClient.METAFOLDER_NAME))
+          && !filePath.contains("/" + 
HoodieTableMetaClient.ERROR_TABLE_FOLDER_NAME)) {

Review comment:
       can you clarify a bit about why exclude error table from this skipping? 
thought everything under `.hoodie/` should be skipped here?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Error Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getErrorTableInsertParallelism();
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+                                            
.withEmbeddedTimelineServerEnabled(false)
+                                            .withPath(basePath)
+                                            
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
+                                            
.forTable(getErrorTableName(writeConfig))
+                                            
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                                                                      
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                                                                      
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
+                                                                      
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
+                                                                          
writeConfig.getMetadataMaxCommitsToKeep()).build())
+                                            .withParallelism(parallelism, 
parallelism);
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return errorTableWriteConfig;
+  }
+
+  /**
+   *  Init if hudi error table not exit.
+   * @param datasetMetaClient
+   * @throws IOException
+   */
+  protected void bootstrapErrorTable(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+
+    if (datasetMetaClient == null) {
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.COPY_ON_WRITE)
+          .setTableName(tableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+          .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+          .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+    } else {
+      boolean exists = datasetMetaClient.getFs().exists(new 
Path(errorTableWriteConfig.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME));
+      if (!exists) {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.COPY_ON_WRITE)
+            .setTableName(tableName)
+            .setArchiveLogFolder("archived")
+            
.setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+            .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+      }
+    }
+  }
+
+  /**
+   *  Build hudi error table base path.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableBasePath(HoodieWriteConfig writeConfig) {
+
+    if (StringUtils.isNullOrEmpty(writeConfig.getErrorTableBasePath())) {
+      return writeConfig.getBasePath() + Path.SEPARATOR +  
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "errors";
+    }
+    return writeConfig.getErrorTableBasePath();
+  }
+
+  /**
+   *  Build hudi error table name.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableName(HoodieWriteConfig writeConfig) {
+
+    return StringUtils.isNullOrEmpty(writeConfig.getErrorTableName())
+               ? writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX : 
writeConfig.getErrorTableName();
+  }
+
+  protected abstract void initialize(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient);

Review comment:
       can you add some javadoc to explain the purpose and usage of this 
method? so that subclasses know how to implement it

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Error Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getErrorTableInsertParallelism();
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+                                            
.withEmbeddedTimelineServerEnabled(false)
+                                            .withPath(basePath)
+                                            
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
+                                            
.forTable(getErrorTableName(writeConfig))
+                                            
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                                                                      
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                                                                      
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
+                                                                      
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
+                                                                          
writeConfig.getMetadataMaxCommitsToKeep()).build())
+                                            .withParallelism(parallelism, 
parallelism);
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return errorTableWriteConfig;
+  }
+
+  /**
+   *  Init if hudi error table not exit.
+   * @param datasetMetaClient
+   * @throws IOException
+   */
+  protected void bootstrapErrorTable(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+
+    if (datasetMetaClient == null) {
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.COPY_ON_WRITE)
+          .setTableName(tableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+          .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+          .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+    } else {
+      boolean exists = datasetMetaClient.getFs().exists(new 
Path(errorTableWriteConfig.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME));
+      if (!exists) {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.COPY_ON_WRITE)
+            .setTableName(tableName)
+            .setArchiveLogFolder("archived")
+            
.setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+            .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+      }
+    }
+  }
+
+  /**
+   *  Build hudi error table base path.
+   * @param writeConfig
+   * @return
+   */

Review comment:
       the method is very self-explanatory and it's unusual to put javadoc on 
private methods. so the docs here looks redundant.

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Error Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getErrorTableInsertParallelism();
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+                                            
.withEmbeddedTimelineServerEnabled(false)
+                                            .withPath(basePath)
+                                            
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
+                                            
.forTable(getErrorTableName(writeConfig))
+                                            
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                                                                      
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                                                                      
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
+                                                                      
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
+                                                                          
writeConfig.getMetadataMaxCommitsToKeep()).build())
+                                            .withParallelism(parallelism, 
parallelism);
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return errorTableWriteConfig;
+  }
+
+  /**
+   *  Init if hudi error table not exit.
+   * @param datasetMetaClient
+   * @throws IOException
+   */
+  protected void bootstrapErrorTable(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+
+    if (datasetMetaClient == null) {
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.COPY_ON_WRITE)
+          .setTableName(tableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+          .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+          .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+    } else {
+      boolean exists = datasetMetaClient.getFs().exists(new 
Path(errorTableWriteConfig.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME));
+      if (!exists) {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.COPY_ON_WRITE)
+            .setTableName(tableName)
+            .setArchiveLogFolder("archived")
+            
.setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+            .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+      }
+    }
+  }
+
+  /**
+   *  Build hudi error table base path.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableBasePath(HoodieWriteConfig writeConfig) {
+
+    if (StringUtils.isNullOrEmpty(writeConfig.getErrorTableBasePath())) {
+      return writeConfig.getBasePath() + Path.SEPARATOR +  
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "errors";
+    }
+    return writeConfig.getErrorTableBasePath();
+  }
+
+  /**
+   *  Build hudi error table name.
+   * @param writeConfig
+   * @return
+   */

Review comment:
       ditto

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Error Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getErrorTableInsertParallelism();
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+                                            
.withEmbeddedTimelineServerEnabled(false)
+                                            .withPath(basePath)
+                                            
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
+                                            
.forTable(getErrorTableName(writeConfig))
+                                            
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                                                                      
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                                                                      
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
+                                                                      
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
+                                                                          
writeConfig.getMetadataMaxCommitsToKeep()).build())
+                                            .withParallelism(parallelism, 
parallelism);
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return errorTableWriteConfig;
+  }
+
+  /**
+   *  Init if hudi error table not exit.
+   * @param datasetMetaClient
+   * @throws IOException
+   */
+  protected void bootstrapErrorTable(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+
+    if (datasetMetaClient == null) {
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.COPY_ON_WRITE)
+          .setTableName(tableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+          .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+          .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+    } else {
+      boolean exists = datasetMetaClient.getFs().exists(new 
Path(errorTableWriteConfig.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME));
+      if (!exists) {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.COPY_ON_WRITE)
+            .setTableName(tableName)
+            .setArchiveLogFolder("archived")
+            
.setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+            .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+      }
+    }
+  }
+
+  /**
+   *  Build hudi error table base path.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableBasePath(HoodieWriteConfig writeConfig) {
+
+    if (StringUtils.isNullOrEmpty(writeConfig.getErrorTableBasePath())) {
+      return writeConfig.getBasePath() + Path.SEPARATOR +  
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "errors";
+    }
+    return writeConfig.getErrorTableBasePath();
+  }
+
+  /**
+   *  Build hudi error table name.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableName(HoodieWriteConfig writeConfig) {
+
+    return StringUtils.isNullOrEmpty(writeConfig.getErrorTableName())
+               ? writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX : 
writeConfig.getErrorTableName();
+  }
+
+  protected abstract void initialize(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient);
+
+  public abstract void commit(O writeStatuses, HoodieTable<T, I, K, O> 
hoodieTable);

Review comment:
       same. javadoc for abstract method

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();

Review comment:
       when `initialize()`, `metaClient` still null?

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Error Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getErrorTableInsertParallelism();
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+                                            
.withEmbeddedTimelineServerEnabled(false)
+                                            .withPath(basePath)
+                                            
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
+                                            
.forTable(getErrorTableName(writeConfig))
+                                            
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                                                                      
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                                                                      
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
+                                                                      
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
+                                                                          
writeConfig.getMetadataMaxCommitsToKeep()).build())
+                                            .withParallelism(parallelism, 
parallelism);
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return errorTableWriteConfig;
+  }
+
+  /**
+   *  Init if hudi error table not exit.
+   * @param datasetMetaClient
+   * @throws IOException
+   */
+  protected void bootstrapErrorTable(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+
+    if (datasetMetaClient == null) {
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.COPY_ON_WRITE)
+          .setTableName(tableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+          .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+          .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+    } else {
+      boolean exists = datasetMetaClient.getFs().exists(new 
Path(errorTableWriteConfig.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME));
+      if (!exists) {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.COPY_ON_WRITE)
+            .setTableName(tableName)
+            .setArchiveLogFolder("archived")
+            
.setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+            .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+      }
+    }
+  }
+
+  /**
+   *  Build hudi error table base path.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableBasePath(HoodieWriteConfig writeConfig) {
+
+    if (StringUtils.isNullOrEmpty(writeConfig.getErrorTableBasePath())) {
+      return writeConfig.getBasePath() + Path.SEPARATOR +  
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "errors";
+    }
+    return writeConfig.getErrorTableBasePath();
+  }
+
+  /**
+   *  Build hudi error table name.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableName(HoodieWriteConfig writeConfig) {
+
+    return StringUtils.isNullOrEmpty(writeConfig.getErrorTableName())
+               ? writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX : 
writeConfig.getErrorTableName();
+  }
+
+  protected abstract void initialize(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient);
+
+  public abstract void commit(O writeStatuses, HoodieTable<T, I, K, O> 
hoodieTable);
+
+  public List<HoodieRecord> createErrorRecord(WriteStatus status, String 
schema, String tableName) {
+
+    HashMap<HoodieKey, Throwable> errorsMap = status.getErrors();
+    List<HoodieRecord> errorHoodieRecords = new ArrayList<>();
+    for (HoodieRecord hoodieRecord : status.getFailedRecords()) {
+
+      String uuid = UUID.randomUUID().toString();
+
+      long timeMillis = System.currentTimeMillis();
+      String ts = String.valueOf(timeMillis);
+      DateTimeZone dateTimeZone = null;
+      String partitionPath = new DateTime(timeMillis, 
dateTimeZone).toString("yyyy/MM/dd");
+
+      HoodieKey hoodieKey = hoodieRecord.getKey();
+
+      HoodieRecordLocation hoodieRecordLocation = null;
+      if (hoodieRecord.getNewLocation().isPresent()) {
+        hoodieRecordLocation = (HoodieRecordLocation) 
hoodieRecord.getNewLocation().get();
+      }
+
+      String instancTime = hoodieRecordLocation == null ? "" : 
hoodieRecordLocation.getInstantTime();
+      String fileId = hoodieRecordLocation == null ? "" : 
hoodieRecordLocation.getFileId();
+      String message = errorsMap.get(hoodieKey).toString();
+
+      OverwriteWithLatestAvroSchemaPayload data = 
(OverwriteWithLatestAvroSchemaPayload) hoodieRecord.getData();
+      GenericRecord genericRecord = null;
+      try {
+        genericRecord = HoodieAvroUtils.bytesToAvro(data.recordBytes, new 
Schema.Parser().parse(data.getSchema()));

Review comment:
       Why not save `hoodieRecord` to `ERROR_RECORD_RECORD` directly instead of 
deserializing this

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Error Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getErrorTableInsertParallelism();
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+                                            
.withEmbeddedTimelineServerEnabled(false)
+                                            .withPath(basePath)
+                                            
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
+                                            
.forTable(getErrorTableName(writeConfig))
+                                            
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                                                                      
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                                                                      
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
+                                                                      
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
+                                                                          
writeConfig.getMetadataMaxCommitsToKeep()).build())
+                                            .withParallelism(parallelism, 
parallelism);
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return errorTableWriteConfig;
+  }
+
+  /**
+   *  Init if hudi error table not exit.
+   * @param datasetMetaClient
+   * @throws IOException
+   */
+  protected void bootstrapErrorTable(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+
+    if (datasetMetaClient == null) {
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.COPY_ON_WRITE)
+          .setTableName(tableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+          .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+          .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+    } else {
+      boolean exists = datasetMetaClient.getFs().exists(new 
Path(errorTableWriteConfig.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME));
+      if (!exists) {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.COPY_ON_WRITE)
+            .setTableName(tableName)
+            .setArchiveLogFolder("archived")
+            
.setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+            .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+      }
+    }
+  }
+
+  /**
+   *  Build hudi error table base path.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableBasePath(HoodieWriteConfig writeConfig) {
+
+    if (StringUtils.isNullOrEmpty(writeConfig.getErrorTableBasePath())) {
+      return writeConfig.getBasePath() + Path.SEPARATOR +  
HoodieTableMetaClient.METAFOLDER_NAME + Path.SEPARATOR + "errors";
+    }
+    return writeConfig.getErrorTableBasePath();
+  }
+
+  /**
+   *  Build hudi error table name.
+   * @param writeConfig
+   * @return
+   */
+  private String getErrorTableName(HoodieWriteConfig writeConfig) {
+
+    return StringUtils.isNullOrEmpty(writeConfig.getErrorTableName())
+               ? writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX : 
writeConfig.getErrorTableName();
+  }
+
+  protected abstract void initialize(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient);
+
+  public abstract void commit(O writeStatuses, HoodieTable<T, I, K, O> 
hoodieTable);
+
+  public List<HoodieRecord> createErrorRecord(WriteStatus status, String 
schema, String tableName) {
+
+    HashMap<HoodieKey, Throwable> errorsMap = status.getErrors();
+    List<HoodieRecord> errorHoodieRecords = new ArrayList<>();
+    for (HoodieRecord hoodieRecord : status.getFailedRecords()) {
+
+      String uuid = UUID.randomUUID().toString();
+
+      long timeMillis = System.currentTimeMillis();
+      String ts = String.valueOf(timeMillis);
+      DateTimeZone dateTimeZone = null;
+      String partitionPath = new DateTime(timeMillis, 
dateTimeZone).toString("yyyy/MM/dd");
+
+      HoodieKey hoodieKey = hoodieRecord.getKey();
+
+      HoodieRecordLocation hoodieRecordLocation = null;
+      if (hoodieRecord.getNewLocation().isPresent()) {
+        hoodieRecordLocation = (HoodieRecordLocation) 
hoodieRecord.getNewLocation().get();
+      }
+
+      String instancTime = hoodieRecordLocation == null ? "" : 
hoodieRecordLocation.getInstantTime();
+      String fileId = hoodieRecordLocation == null ? "" : 
hoodieRecordLocation.getFileId();
+      String message = errorsMap.get(hoodieKey).toString();
+
+      OverwriteWithLatestAvroSchemaPayload data = 
(OverwriteWithLatestAvroSchemaPayload) hoodieRecord.getData();

Review comment:
       this casting looks problematic. what if user used a different payload 
class?

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/error/FlinkHoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.HoodieFlinkWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class FlinkHoodieBackedErrorTableWriter<T extends HoodieRecordPayload> 
extends
+    HoodieBackedErrorTableWriter<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(FlinkHoodieBackedErrorTableWriter.class);
+
+  public static HoodieBackedErrorTableWriter create(Configuration conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext context) {
+    return new FlinkHoodieBackedErrorTableWriter(conf, writeConfig, context);
+  }
+
+  FlinkHoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    super(hadoopConf, writeConfig, engineContext);
+  }
+
+  @Override
+  protected void initialize(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) {
+
+    try {
+      bootstrapErrorTable(datasetMetaClient);
+    } catch (IOException e) {
+      LOG.error("init error table fail", e);
+    }

Review comment:
       if this failed, don't you want to throw up the exception and failed the 
whole thing loudly? silencing this looks problematic

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/error/HoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.config.HoodieErrorTableConfig;
+import org.apache.hudi.common.config.SerializableConfiguration;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieCleaningPolicy;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.model.HoodieTableType;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.model.HoodieRecordLocation;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroSchemaPayload;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieAvroPayload;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.config.HoodieCompactionConfig;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_COMMIT_TIME_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_KEY_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_PARTITION_PATH_METADATA_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_FILE_ID_FIELD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_TABLE_NAME;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_UUID;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_TS;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_SCHEMA;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_RECORD;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_MESSAGE;
+import static 
org.apache.hudi.common.config.HoodieErrorTableConfig.ERROR_RECORD_CONTEXT;
+
+/**
+ * Writer implementation backed by an internal hudi table. Error records are 
saved within an internal COW table
+ * called Error table.
+ */
+public abstract class HoodieBackedErrorTableWriter<T extends 
HoodieRecordPayload, I, K, O>  implements Serializable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieBackedErrorTableWriter.class);
+
+  protected HoodieWriteConfig errorTableWriteConfig;
+  protected HoodieWriteConfig datasetWriteConfig;
+  protected String tableName;
+
+  protected HoodieTableMetaClient metaClient;
+  protected SerializableConfiguration hadoopConf;
+  protected final transient HoodieEngineContext engineContext;
+  protected String basePath;
+
+  protected HoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    this.datasetWriteConfig = writeConfig;
+    this.engineContext = engineContext;
+    this.hadoopConf = new SerializableConfiguration(hadoopConf);
+
+    if (writeConfig.errorTableEnabled()) {
+      this.tableName = writeConfig.getTableName() + 
HoodieErrorTableConfig.ERROR_TABLE_NAME_SUFFIX;
+      this.basePath = getErrorTableBasePath(writeConfig);
+      this.errorTableWriteConfig = createErrorDataWriteConfig(writeConfig);
+      initialize(engineContext, metaClient);
+      this.metaClient = HoodieTableMetaClient.builder()
+                            .setConf(hadoopConf)
+                            
.setBasePath(errorTableWriteConfig.getBasePath()).build();
+    }
+  }
+
+  /**
+   * Create a {@code HoodieWriteConfig} to use for the Error Table.
+   *
+   * @param writeConfig {@code HoodieWriteConfig} of the main dataset writer
+   */
+  private HoodieWriteConfig createErrorDataWriteConfig(HoodieWriteConfig 
writeConfig) {
+    int parallelism = writeConfig.getErrorTableInsertParallelism();
+    // Create the write config for the metadata table by borrowing options 
from the main write config.
+    HoodieWriteConfig.Builder builder = HoodieWriteConfig.newBuilder()
+                                            
.withEmbeddedTimelineServerEnabled(false)
+                                            .withPath(basePath)
+                                            
.withSchema(HoodieErrorTableConfig.ERROR_TABLE_SCHEMA)
+                                            
.forTable(getErrorTableName(writeConfig))
+                                            
.withCompactionConfig(HoodieCompactionConfig.newBuilder()
+                                                                      
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_COMMITS)
+                                                                      
.retainCommits(writeConfig.getErrorTableCleanerCommitsRetained())
+                                                                      
.archiveCommitsWith(writeConfig.getErrorTableMinCommitsToKeep(),
+                                                                          
writeConfig.getMetadataMaxCommitsToKeep()).build())
+                                            .withParallelism(parallelism, 
parallelism);
+
+    return builder.build();
+  }
+
+  public HoodieWriteConfig getWriteConfig() {
+    return errorTableWriteConfig;
+  }
+
+  /**
+   *  Init if hudi error table not exit.
+   * @param datasetMetaClient
+   * @throws IOException
+   */
+  protected void bootstrapErrorTable(HoodieTableMetaClient datasetMetaClient) 
throws IOException {
+
+    if (datasetMetaClient == null) {
+      HoodieTableMetaClient.withPropertyBuilder()
+          .setTableType(HoodieTableType.COPY_ON_WRITE)
+          .setTableName(tableName)
+          .setArchiveLogFolder("archived")
+          .setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+          .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+          .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());
+    } else {
+      boolean exists = datasetMetaClient.getFs().exists(new 
Path(errorTableWriteConfig.getBasePath(), 
HoodieTableMetaClient.METAFOLDER_NAME));
+      if (!exists) {
+        HoodieTableMetaClient.withPropertyBuilder()
+            .setTableType(HoodieTableType.COPY_ON_WRITE)
+            .setTableName(tableName)
+            .setArchiveLogFolder("archived")
+            
.setPayloadClassName(OverwriteWithLatestAvroPayload.class.getName())
+            .setBaseFileFormat(HoodieFileFormat.PARQUET.toString())
+            .initTable(new Configuration(hadoopConf.get()), 
errorTableWriteConfig.getBasePath());

Review comment:
       these lines look duplicated to the ones under if condition

##########
File path: 
hudi-client/hudi-java-client/src/main/java/org/apache/hudi/error/JavaHoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.HoodieJavaWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class JavaHoodieBackedErrorTableWriter<T extends HoodieRecordPayload> 
extends
+    HoodieBackedErrorTableWriter<T, List<HoodieRecord<T>>, List<HoodieKey>, 
List<WriteStatus>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(JavaHoodieBackedErrorTableWriter.class);
+
+  public static HoodieBackedErrorTableWriter create(Configuration conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext context) {
+    return new JavaHoodieBackedErrorTableWriter(conf, writeConfig, context);
+  }
+
+  JavaHoodieBackedErrorTableWriter(Configuration hadoopConf, HoodieWriteConfig 
writeConfig, HoodieEngineContext engineContext) {
+    super(hadoopConf, writeConfig, engineContext);
+  }
+
+  @Override
+  protected void initialize(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) {
+
+    try {
+      bootstrapErrorTable(datasetMetaClient);
+    } catch (IOException e) {
+      LOG.error("init error table fail", e);
+    }

Review comment:
       ditto

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/error/SparkHoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+
+public class SparkHoodieBackedErrorTableWriter<T extends HoodieRecordPayload> 
extends
+    HoodieBackedErrorTableWriter<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(SparkHoodieBackedErrorTableWriter.class);
+
+  public static HoodieBackedErrorTableWriter create(Configuration conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    return new SparkHoodieBackedErrorTableWriter(conf, writeConfig, 
engineContext);
+  }
+
+  SparkHoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    super(hadoopConf, writeConfig, engineContext);
+  }
+
+  @Override
+  protected void initialize(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) {
+
+    try {
+      bootstrapErrorTable(datasetMetaClient);
+    } catch (IOException e) {
+      LOG.error("init error table fail", e);

Review comment:
       why not make `bootstrapErrorTable()` invoked in `initialize()` by 
default in the parent class?

##########
File path: 
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/error/SparkHoodieBackedErrorTableWriter.java
##########
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.error;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hudi.client.SparkRDDWriteClient;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.model.HoodieKey;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+
+import java.io.IOException;
+
+public class SparkHoodieBackedErrorTableWriter<T extends HoodieRecordPayload> 
extends
+    HoodieBackedErrorTableWriter<T, JavaRDD<HoodieRecord<T>>, 
JavaRDD<HoodieKey>, JavaRDD<WriteStatus>> {
+
+  private static final Logger LOG = 
LogManager.getLogger(SparkHoodieBackedErrorTableWriter.class);
+
+  public static HoodieBackedErrorTableWriter create(Configuration conf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    return new SparkHoodieBackedErrorTableWriter(conf, writeConfig, 
engineContext);
+  }
+
+  SparkHoodieBackedErrorTableWriter(Configuration hadoopConf, 
HoodieWriteConfig writeConfig, HoodieEngineContext engineContext) {
+    super(hadoopConf, writeConfig, engineContext);
+  }
+
+  @Override
+  protected void initialize(HoodieEngineContext engineContext, 
HoodieTableMetaClient datasetMetaClient) {
+
+    try {
+      bootstrapErrorTable(datasetMetaClient);
+    } catch (IOException e) {
+      LOG.error("init error table fail", e);

Review comment:
       ditto.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to