This is an automated email from the ASF dual-hosted git repository.

danny0405 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f8630cc57 [HUDI-4167] Remove the timeline refresh with initializing 
hoodie table (#5716)
7f8630cc57 is described below

commit 7f8630cc57fbb9d29e8dc7ca87b582264da073fd
Author: Danny Chan <[email protected]>
AuthorDate: Thu Jun 2 09:48:48 2022 +0800

    [HUDI-4167] Remove the timeline refresh with initializing hoodie table 
(#5716)
    
    The timeline refresh on table initialization invokes the fs view #sync, 
which has two actions now:
    
    1. reload the timeline of the fs view, so that the next fs view request is 
based on this timeline metadata
    2. if this is a local fs view, clear all the local states; if this is a 
remote fs view, send request to sync the remote fs view
    
    But, let's see the construction, the meta client is instantiated freshly so 
the timeline is already the latest,
    the table is also constructed freshly, so the fs view has no local states, 
that means, the #sync is unnecessary totally.
    
    In this patch, the metadata lifecycle and data set fs view are kept in 
sync, when the fs view is refreshed, the underneath metadata
    is also refreshed synchronouly. The freshness of the metadata follows the 
same rules as data fs view:
    
    1. if the fs view is local, the visibility is based on the client table 
metadata client's latest commit
    2. if the fs view is remote, the timeline server would #sync the fs view 
and metadata together based on the lagging server local timeline
    
    From the perspective of client, no need to care about the refresh action 
anymore no matter whether the metadata table is enabled or not.
    That make the client logic more clear and less error-prone.
    
    Removes the timeline refresh has another benefit: if avoids unncecessary 
#refresh of the remote fs view, if all the clients send request to #sync the
    remote fs view, the server would encounter conflicts and the client 
encounters a response error.
---
 .../org/apache/hudi/cli/commands/SparkMain.java    |  2 +-
 .../apache/hudi/client/BaseHoodieWriteClient.java  | 28 ++++++++++------------
 .../org/apache/hudi/config/HoodieWriteConfig.java  |  9 +++++--
 .../apache/hudi/client/HoodieFlinkWriteClient.java |  3 +--
 .../org/apache/hudi/table/HoodieFlinkTable.java    | 10 --------
 .../index/bloom/TestFlinkHoodieBloomIndex.java     |  2 +-
 .../apache/hudi/client/HoodieJavaWriteClient.java  |  4 +---
 .../apache/hudi/client/SparkRDDWriteClient.java    | 12 ++++------
 .../org/apache/hudi/table/HoodieSparkTable.java    | 17 +------------
 .../hudi/client/TestTableSchemaEvolution.java      |  8 ++++++-
 .../functional/TestHoodieBackedTableMetadata.java  |  2 +-
 .../hudi/table/TestHoodieMergeOnReadTable.java     |  2 +-
 .../hudi/testutils/HoodieClientTestHarness.java    |  2 +-
 .../table/view/RocksDbBasedFileSystemView.java     |  1 +
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 21 ++++++++++++----
 .../hudi/timeline/service/TimelineService.java     |  3 ++-
 16 files changed, 59 insertions(+), 67 deletions(-)

diff --git a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java 
b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
index 9fe83f1995..43fe168587 100644
--- a/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
+++ b/hudi-cli/src/main/java/org/apache/hudi/cli/commands/SparkMain.java
@@ -296,7 +296,7 @@ public class SparkMain {
       SparkRDDWriteClient client = createHoodieClient(jsc, basePath, false);
       HoodieWriteConfig config = client.getConfig();
       HoodieEngineContext context = client.getEngineContext();
-      HoodieSparkTable table = HoodieSparkTable.create(config, context, true);
+      HoodieSparkTable table = HoodieSparkTable.create(config, context);
       WriteMarkersFactory.get(config.getMarkersType(), table, instantTime)
           .quietDeleteMarkerDir(context, config.getMarkersDeleteParallelism());
       return 0;
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
index 5c485bed05..455cb644c7 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/client/BaseHoodieWriteClient.java
@@ -296,11 +296,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     }
   }
 
-  protected HoodieTable createTable(HoodieWriteConfig config, Configuration 
hadoopConf) {
-    return createTable(config, hadoopConf, false);
-  }
-
-  protected abstract HoodieTable createTable(HoodieWriteConfig config, 
Configuration hadoopConf, boolean refreshTimeline);
+  protected abstract HoodieTable<T, I, K, O> createTable(HoodieWriteConfig 
config, Configuration hadoopConf);
 
   void emitCommitMetrics(String instantTime, HoodieCommitMetadata metadata, 
String actionType) {
     try {
@@ -365,7 +361,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    */
   protected void rollbackFailedBootstrap() {
     LOG.info("Rolling back pending bootstrap if present");
-    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, 
config.isMetadataTableEnabled());
+    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
     HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterPendingExcludingCompaction();
     Option<String> instant = Option.fromJavaOptional(
         
inflightTimeline.getReverseOrderedInstants().map(HoodieInstant::getTimestamp).findFirst());
@@ -634,7 +630,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    * Run any pending compactions.
    */
   public void runAnyPendingCompactions() {
-    runAnyPendingCompactions(createTable(config, hadoopConf, 
config.isMetadataTableEnabled()));
+    runAnyPendingCompactions(createTable(config, hadoopConf));
   }
 
   /**
@@ -644,7 +640,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    * @param comment - Comment for the savepoint
    */
   public void savepoint(String user, String comment) {
-    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, 
config.isMetadataTableEnabled());
+    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
     if (table.getCompletedCommitsTimeline().empty()) {
       throw new HoodieSavepointException("Could not savepoint. Commit timeline 
is empty");
     }
@@ -668,7 +664,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    * @param comment - Comment for the savepoint
    */
   public void savepoint(String instantTime, String user, String comment) {
-    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, 
config.isMetadataTableEnabled());
+    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
     table.savepoint(context, instantTime, user, comment);
   }
 
@@ -680,7 +676,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    * @return true if the savepoint was deleted successfully
    */
   public void deleteSavepoint(String savepointTime) {
-    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf, 
config.isMetadataTableEnabled());
+    HoodieTable<T, I, K, O> table = createTable(config, hadoopConf);
     SavepointHelpers.deleteSavepoint(table, savepointTime);
   }
 
@@ -1012,7 +1008,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    */
   public Option<String> scheduleIndexing(List<MetadataPartitionType> 
partitionTypes) {
     String instantTime = HoodieActiveTimeline.createNewInstantTime();
-    Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf, 
config.isMetadataTableEnabled())
+    Option<HoodieIndexPlan> indexPlan = createTable(config, hadoopConf)
         .scheduleIndexing(context, instantTime, partitionTypes);
     return indexPlan.isPresent() ? Option.of(instantTime) : Option.empty();
   }
@@ -1024,7 +1020,7 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
    * @return {@link Option<HoodieIndexCommitMetadata>} after successful 
indexing.
    */
   public Option<HoodieIndexCommitMetadata> index(String indexInstantTime) {
-    return createTable(config, hadoopConf, 
config.isMetadataTableEnabled()).index(context, indexInstantTime);
+    return createTable(config, hadoopConf).index(context, indexInstantTime);
   }
 
   /**
@@ -1339,17 +1335,17 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
         return Option.empty();
       case CLUSTER:
         LOG.info("Scheduling clustering at instant time :" + instantTime);
-        Option<HoodieClusteringPlan> clusteringPlan = createTable(config, 
hadoopConf, config.isMetadataTableEnabled())
+        Option<HoodieClusteringPlan> clusteringPlan = createTable(config, 
hadoopConf)
             .scheduleClustering(context, instantTime, extraMetadata);
         return clusteringPlan.isPresent() ? Option.of(instantTime) : 
Option.empty();
       case COMPACT:
         LOG.info("Scheduling compaction at instant time :" + instantTime);
-        Option<HoodieCompactionPlan> compactionPlan = createTable(config, 
hadoopConf, config.isMetadataTableEnabled())
+        Option<HoodieCompactionPlan> compactionPlan = createTable(config, 
hadoopConf)
             .scheduleCompaction(context, instantTime, extraMetadata);
         return compactionPlan.isPresent() ? Option.of(instantTime) : 
Option.empty();
       case CLEAN:
         LOG.info("Scheduling cleaning at instant time :" + instantTime);
-        Option<HoodieCleanerPlan> cleanerPlan = createTable(config, 
hadoopConf, config.isMetadataTableEnabled())
+        Option<HoodieCleanerPlan> cleanerPlan = createTable(config, hadoopConf)
             .scheduleCleaning(context, instantTime, extraMetadata);
         return cleanerPlan.isPresent() ? Option.of(instantTime) : 
Option.empty();
       default:
@@ -1702,6 +1698,6 @@ public abstract class BaseHoodieWriteClient<T extends 
HoodieRecordPayload, I, K,
     // try to save history schemas
     FileBasedInternalSchemaStorageManager schemasManager = new 
FileBasedInternalSchemaStorageManager(metaClient);
     schemasManager.persistHistorySchemaStr(instantTime, 
SerDeHelper.inheritSchemas(newSchema, historySchemaStr));
-    commitStats(instantTime, Collections.EMPTY_LIST, Option.of(extraMeta), 
commitActionType);
+    commitStats(instantTime, Collections.emptyList(), Option.of(extraMeta), 
commitActionType);
   }
 }
diff --git 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
index 42208a0734..1603965ea9 100644
--- 
a/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
+++ 
b/hudi-client/hudi-client-common/src/main/java/org/apache/hudi/config/HoodieWriteConfig.java
@@ -364,8 +364,8 @@ public class HoodieWriteConfig extends HoodieConfig {
 
   public static final ConfigProperty<Boolean> 
REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT = ConfigProperty
       .key("hoodie.refresh.timeline.server.based.on.latest.commit")
-      .defaultValue(false)
-      .withDocumentation("Refresh timeline in timeline server based on latest 
commit apart from timeline hash difference. By default (false), ");
+      .defaultValue(true)
+      .withDocumentation("Refresh timeline in timeline server based on latest 
commit apart from timeline hash difference. By default (true).");
 
   public static final ConfigProperty<Long> 
INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = ConfigProperty
       .key("hoodie.consistency.check.initial_interval_ms")
@@ -2499,6 +2499,11 @@ public class HoodieWriteConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder withRefreshTimelineServerBasedOnLatestCommit(boolean 
refreshTimelineServerBasedOnLatestCommit) {
+      writeConfig.setValue(REFRESH_TIMELINE_SERVER_BASED_ON_LATEST_COMMIT, 
Boolean.toString(refreshTimelineServerBasedOnLatestCommit));
+      return this;
+    }
+
     protected void setDefaults() {
       writeConfig.setDefaultValue(MARKERS_TYPE, 
getDefaultMarkersType(engineType));
       // Check for mandatory properties
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
index ddfbabaf36..b68cf97e9a 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/HoodieFlinkWriteClient.java
@@ -117,8 +117,7 @@ public class HoodieFlinkWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable createTable(HoodieWriteConfig config, Configuration 
hadoopConf,
-                                    boolean refreshTimeline) {
+  protected HoodieTable createTable(HoodieWriteConfig config, Configuration 
hadoopConf) {
     return HoodieFlinkTable.create(config, (HoodieFlinkEngineContext) context);
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
index 6eae15e7e1..26149918c6 100644
--- 
a/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
+++ 
b/hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/table/HoodieFlinkTable.java
@@ -62,13 +62,6 @@ public abstract class HoodieFlinkTable<T extends 
HoodieRecordPayload>
   public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> 
create(HoodieWriteConfig config,
                                                                            
HoodieFlinkEngineContext context,
                                                                            
HoodieTableMetaClient metaClient) {
-    return HoodieFlinkTable.create(config, context, metaClient, 
config.isMetadataTableEnabled());
-  }
-
-  public static <T extends HoodieRecordPayload> HoodieFlinkTable<T> 
create(HoodieWriteConfig config,
-                                                                           
HoodieFlinkEngineContext context,
-                                                                           
HoodieTableMetaClient metaClient,
-                                                                           
boolean refreshTimeline) {
     final HoodieFlinkTable<T> hoodieFlinkTable;
     switch (metaClient.getTableType()) {
       case COPY_ON_WRITE:
@@ -80,9 +73,6 @@ public abstract class HoodieFlinkTable<T extends 
HoodieRecordPayload>
       default:
         throw new HoodieException("Unsupported table type :" + 
metaClient.getTableType());
     }
-    if (refreshTimeline) {
-      hoodieFlinkTable.getHoodieView().sync();
-    }
     return hoodieFlinkTable;
   }
 
diff --git 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
index 50adabbd58..e23ee4ad58 100644
--- 
a/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
+++ 
b/hudi-client/hudi-flink-client/src/test/java/org/apache/hudi/index/bloom/TestFlinkHoodieBloomIndex.java
@@ -104,7 +104,7 @@ public class TestFlinkHoodieBloomIndex extends 
HoodieFlinkClientTestHarness {
   public void testLoadInvolvedFiles(boolean rangePruning, boolean 
treeFiltering, boolean bucketizedChecking) throws Exception {
     HoodieWriteConfig config = makeConfig(rangePruning, treeFiltering, 
bucketizedChecking);
     HoodieBloomIndex index = new HoodieBloomIndex(config, 
ListBasedHoodieBloomIndexHelper.getInstance());
-    HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, 
metaClient, false);
+    HoodieTable hoodieTable = HoodieFlinkTable.create(config, context, 
metaClient);
     HoodieFlinkWriteableTestTable testTable = 
HoodieFlinkWriteableTestTable.of(hoodieTable, SCHEMA);
 
     // Create some partitions, and put some files
diff --git 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
index 7f5dc19baf..fbfb85bab3 100644
--- 
a/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
+++ 
b/hudi-client/hudi-java-client/src/main/java/org/apache/hudi/client/HoodieJavaWriteClient.java
@@ -89,9 +89,7 @@ public class HoodieJavaWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable createTable(HoodieWriteConfig config,
-                                    Configuration hadoopConf,
-                                    boolean refreshTimeline) {
+  protected HoodieTable createTable(HoodieWriteConfig config, Configuration 
hadoopConf) {
     return HoodieJavaTable.create(config, context);
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
index 7f9ec05e3c..fe6ea975e3 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/client/SparkRDDWriteClient.java
@@ -123,10 +123,8 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
   }
 
   @Override
-  protected HoodieTable createTable(HoodieWriteConfig config,
-                                    Configuration hadoopConf,
-                                    boolean refreshTimeline) {
-    return HoodieSparkTable.create(config, context, refreshTimeline);
+  protected HoodieTable createTable(HoodieWriteConfig config, Configuration 
hadoopConf) {
+    return HoodieSparkTable.create(config, context);
   }
 
   @Override
@@ -333,7 +331,7 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
 
   @Override
   protected HoodieWriteMetadata<JavaRDD<WriteStatus>> compact(String 
compactionInstantTime, boolean shouldComplete) {
-    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, true);
+    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(compactionInstantTime, WriteOperationType.COMPACT, 
table.getMetaClient());
     HoodieTimeline pendingCompactionTimeline = 
table.getActiveTimeline().filterPendingCompactionTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
@@ -352,7 +350,7 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
 
   @Override
   public HoodieWriteMetadata<JavaRDD<WriteStatus>> cluster(String 
clusteringInstant, boolean shouldComplete) {
-    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context, 
config.isMetadataTableEnabled());
+    HoodieSparkTable<T> table = HoodieSparkTable.create(config, context);
     preWrite(clusteringInstant, WriteOperationType.CLUSTER, 
table.getMetaClient());
     HoodieTimeline pendingClusteringTimeline = 
table.getActiveTimeline().filterPendingReplaceTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getReplaceCommitInflightInstant(clusteringInstant);
@@ -434,7 +432,7 @@ public class SparkRDDWriteClient<T extends 
HoodieRecordPayload> extends
     }
 
     // Create a Hoodie table which encapsulated the commits and files visible
-    return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, 
metaClient, config.isMetadataTableEnabled());
+    return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, 
metaClient);
   }
 
   /**
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
index 20e3bd4c14..66d51c9128 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/table/HoodieSparkTable.java
@@ -54,30 +54,18 @@ public abstract class HoodieSparkTable<T extends 
HoodieRecordPayload>
   }
 
   public static <T extends HoodieRecordPayload> HoodieSparkTable<T> 
create(HoodieWriteConfig config, HoodieEngineContext context) {
-    return create(config, context, false);
-  }
-
-  public static <T extends HoodieRecordPayload> HoodieSparkTable<T> 
create(HoodieWriteConfig config, HoodieEngineContext context,
-                                                                           
boolean refreshTimeline) {
     HoodieTableMetaClient metaClient =
         
HoodieTableMetaClient.builder().setConf(context.getHadoopConf().get()).setBasePath(config.getBasePath())
             
.setLoadActiveTimelineOnLoad(true).setConsistencyGuardConfig(config.getConsistencyGuardConfig())
             .setLayoutVersion(Option.of(new 
TimelineLayoutVersion(config.getTimelineLayoutVersion())))
             .setFileSystemRetryConfig(config.getFileSystemRetryConfig())
             .setProperties(config.getProps()).build();
-    return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, 
metaClient, refreshTimeline);
+    return HoodieSparkTable.create(config, (HoodieSparkEngineContext) context, 
metaClient);
   }
 
   public static <T extends HoodieRecordPayload> HoodieSparkTable<T> 
create(HoodieWriteConfig config,
                                                                            
HoodieSparkEngineContext context,
                                                                            
HoodieTableMetaClient metaClient) {
-    return create(config, context, metaClient, false);
-  }
-
-  public static <T extends HoodieRecordPayload> HoodieSparkTable<T> 
create(HoodieWriteConfig config,
-                                                                           
HoodieSparkEngineContext context,
-                                                                           
HoodieTableMetaClient metaClient,
-                                                                           
boolean refreshTimeline) {
     HoodieSparkTable<T> hoodieSparkTable;
     switch (metaClient.getTableType()) {
       case COPY_ON_WRITE:
@@ -89,9 +77,6 @@ public abstract class HoodieSparkTable<T extends 
HoodieRecordPayload>
       default:
         throw new HoodieException("Unsupported table type :" + 
metaClient.getTableType());
     }
-    if (refreshTimeline) {
-      hoodieSparkTable.getHoodieView().sync();
-    }
     return hoodieSparkTable;
   }
 
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
index 1cb7bcbfc4..98bcb11033 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/TestTableSchemaEvolution.java
@@ -515,7 +515,13 @@ public class TestTableSchemaEvolution extends 
HoodieClientTestBase {
     return getConfigBuilder(schema)
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.INMEMORY).build())
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().withMaxNumDeltaCommitsBeforeCompaction(1).build())
-        .withAvroSchemaValidate(true);
+        .withAvroSchemaValidate(true)
+        // The test has rollback instants on the timeline,
+        // these rollback instants use real time as instant time, whose 
instant time is always greater than
+        // the normal commits instant time, this breaks the refresh rule 
introduced in HUDI-2761:
+        // The last client instant is always the rollback instant but not the 
normal commit.
+        // Always refresh the timeline when client and server have different 
timeline.
+        .withRefreshTimelineServerBasedOnLatestCommit(false);
   }
 
   @Override
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
index 9a8fc55a20..e19c8fc1a2 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedTableMetadata.java
@@ -111,7 +111,7 @@ public class TestHoodieBackedTableMetadata extends 
TestHoodieMetadataBase {
     assertEquals(fsPartitions, metadataPartitions, "Partitions should match");
 
     // Files within each partition should match
-    HoodieTable table = HoodieSparkTable.create(writeConfig, context, true);
+    HoodieTable table = HoodieSparkTable.create(writeConfig, context);
     TableFileSystemView tableView = table.getHoodieView();
     List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> 
basePath + "/" + partition).collect(Collectors.toList());
     Map<String, FileStatus[]> partitionToFilesMap = 
tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
index b9f025223b..0ce6ca0ee9 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/table/TestHoodieMergeOnReadTable.java
@@ -247,7 +247,7 @@ public class TestHoodieMergeOnReadTable extends 
SparkClientFunctionalTestHarness
       assertEquals(allPartitions.size(), testTable.listAllBaseFiles().length);
 
       // Verify that all data file has one log file
-      HoodieTable table = HoodieSparkTable.create(config, context(), 
metaClient, true);
+      HoodieTable table = HoodieSparkTable.create(config, context(), 
metaClient);
       for (String partitionPath : dataGen.getPartitionPaths()) {
         List<FileSlice> groupedLogFiles =
             
table.getSliceView().getLatestFileSlices(partitionPath).collect(Collectors.toList());
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
index 4504c552c9..d0365dced1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/testutils/HoodieClientTestHarness.java
@@ -559,7 +559,7 @@ public abstract class HoodieClientTestHarness extends 
HoodieCommonTestHarness im
 
     // Files within each partition should match
     metaClient = HoodieTableMetaClient.reload(metaClient);
-    HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext, 
true);
+    HoodieTable table = HoodieSparkTable.create(writeConfig, engineContext);
     TableFileSystemView tableView = table.getHoodieView();
     List<String> fullPartitionPaths = fsPartitions.stream().map(partition -> 
basePath + "/" + partition).collect(Collectors.toList());
     Map<String, FileStatus[]> partitionToFilesMap = 
tableMetadata.getAllFilesInPartitions(fullPartitionPaths);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
index af0dc13016..02a406e7e0 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/view/RocksDbBasedFileSystemView.java
@@ -199,6 +199,7 @@ public class RocksDbBasedFileSystemView extends 
IncrementalTimelineSyncFileSyste
     LOG.info("Deleting all rocksdb data associated with table filesystem 
view");
     rocksDB.close();
     rocksDB = new RocksDBDAO(metaClient.getBasePath(), 
config.getRocksdbBasePath());
+    schemaHelper.getAllColumnFamilies().forEach(rocksDB::addColumnFamily);
   }
 
   @Override
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index cf941bb70c..e8937b39dc 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -550,10 +550,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
 
   @Override
   public void close() {
-    for (Pair<String, String> partitionFileSlicePair : 
partitionReaders.keySet()) {
-      close(partitionFileSlicePair);
-    }
-    partitionReaders.clear();
+    closePartitionReaders();
   }
 
   /**
@@ -567,6 +564,16 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     closeReader(readers);
   }
 
+  /**
+   * Close and clear all the partitions readers.
+   */
+  private void closePartitionReaders() {
+    for (Pair<String, String> partitionFileSlicePair : 
partitionReaders.keySet()) {
+      close(partitionFileSlicePair);
+    }
+    partitionReaders.clear();
+  }
+
   private void closeReader(Pair<HoodieFileReader, 
HoodieMetadataMergedLogRecordReader> readers) {
     if (readers != null) {
       try {
@@ -624,5 +631,11 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   public void reset() {
     initIfNeeded();
     dataMetaClient.reloadActiveTimeline();
+    if (metadataMetaClient != null) {
+      metadataMetaClient.reloadActiveTimeline();
+    }
+    // the cached reader has max instant time restriction, they should be 
cleared
+    // because the metadata timeline may have changed.
+    closePartitionReaders();
   }
 }
diff --git 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
index 40669f50e4..2ff2168221 100644
--- 
a/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
+++ 
b/hudi-timeline-service/src/main/java/org/apache/hudi/timeline/service/TimelineService.java
@@ -150,7 +150,7 @@ public class TimelineService {
       private int markerBatchNumThreads = 20;
       private long markerBatchIntervalMs = 50L;
       private int markerParallelism = 100;
-      private boolean refreshTimelineBasedOnLatestCommit = false;
+      private boolean refreshTimelineBasedOnLatestCommit = true;
 
       public Builder() {
       }
@@ -240,6 +240,7 @@ public class TimelineService {
         config.markerBatchNumThreads = this.markerBatchNumThreads;
         config.markerBatchIntervalMs = this.markerBatchIntervalMs;
         config.markerParallelism = this.markerParallelism;
+        config.refreshTimelineBasedOnLatestCommit = 
this.refreshTimelineBasedOnLatestCommit;
         return config;
       }
     }

Reply via email to