nsivabalan commented on a change in pull request #3743:
URL: https://github.com/apache/hudi/pull/3743#discussion_r720970053



##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/RollbackUtils.java
##########
@@ -120,21 +120,24 @@ static HoodieRollbackStat 
mergeRollbackStat(HoodieRollbackStat stat1, HoodieRoll
    * Generate all rollback requests that we need to perform for rolling back 
this action without actually performing rolling back for MOR table type.
    *
    * @param instantToRollback Instant to Rollback
-   * @param table instance of {@link HoodieTable} to use.
-   * @param context instance of {@link HoodieEngineContext} to use.
+   * @param metaClient        instance of {@link HoodieTableMetaClient} to use.
+   * @param config            Write config.
+   * @param context           instance of {@link HoodieEngineContext} to use.
+   * @param fileSystemView    File system view.
    * @return list of rollback requests
    */
-  public static List<ListingBasedRollbackRequest> 
generateRollbackRequestsUsingFileListingMOR(HoodieInstant instantToRollback, 
HoodieTable table, HoodieEngineContext context) throws IOException {
+  public static List<ListingBasedRollbackRequest> 
generateRollbackRequestsUsingFileListingMOR(
+      HoodieInstant instantToRollback, HoodieTableMetaClient metaClient, 
HoodieWriteConfig config,
+      HoodieEngineContext context, TableFileSystemView.SliceView 
fileSystemView) throws IOException {

Review comment:
       lets call the last arg as sliceView. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -63,7 +63,8 @@ public ListingBasedRollbackStrategy(HoodieTable table,
             table.getMetaClient().getBasePath(), config);
       } else {
         rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
+            .generateRollbackRequestsUsingFileListingMOR(

Review comment:
       may I know is there any particular reason to pass slice view from here? 
why not pass in table and let generateAppendRollbackBlocksAction get the slice 
view from table? 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper class to assist in upgrading/downgrading Hoodie when there is a 
version change.
+ */
+public class UpgradeDowngrade {
+
+  private static final Logger LOG = 
LogManager.getLogger(UpgradeDowngrade.class);
+  public static final String HOODIE_UPDATED_PROPERTY_FILE = 
"hoodie.properties.updated";
+
+  private HoodieTableMetaClient metaClient;
+  protected HoodieWriteConfig config;
+  protected HoodieEngineContext context;
+  private transient FileSystem fs;
+  private Path updatedPropsFilePath;
+  private Path propsFilePath;
+
+  public UpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig 
config, HoodieEngineContext context) {
+    this.metaClient = metaClient;
+    this.config = config;
+    this.context = context;
+    this.fs = metaClient.getFs();
+    this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), 
HOODIE_UPDATED_PROPERTY_FILE);
+    this.propsFilePath = new Path(metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+  }
+
+  public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) {
+    HoodieTableVersion fromVersion = 
metaClient.getTableConfig().getTableVersion();
+    // Ensure no inflight commits & versions are same
+    return toVersion.versionCode() != fromVersion.versionCode();
+  }
+
+  /**
+   * Perform Upgrade or Downgrade steps if required and updated table version 
if need be.
+   * <p>
+   * Starting from version 0.6.0, this upgrade/downgrade step will be added in 
all write paths.
+   * <p>
+   * Essentially, if a dataset was created using any pre 0.6.0(for eg 0.5.3), 
and Hoodie version was upgraded to 0.6.0,
+   * Hoodie table version gets bumped to 1 and there are some upgrade steps 
need to be executed before doing any writes.
+   * Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie 
table version 1 and then hoodie was downgraded
+   * to pre 0.6.0 or to Hoodie table version 0, then some downgrade steps need 
to be executed before proceeding w/ any writes.

Review comment:
       lets also add a comment that we have 2 versions now. w/ 090, we added 
version 2 as well. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/marker/WriteMarkersFactory.java
##########
@@ -34,24 +36,28 @@
   private static final Logger LOG = 
LogManager.getLogger(WriteMarkersFactory.class);
 
   /**
-   * @param markerType the type of markers to use
-   * @param table {@code HoodieTable} instance
+   * @param markerType  the type of markers to use
+   * @param metaClient  {@link HoodieTableMetaClient} instance to use
+   * @param config      Write config
+   * @param context     {@link HoodieEngineContext} instance to use
    * @param instantTime current instant time
-   * @return  {@code WriteMarkers} instance based on the {@code MarkerType}
+   * @return {@code WriteMarkers} instance based on the {@code MarkerType}
    */
-  public static WriteMarkers get(MarkerType markerType, HoodieTable table, 
String instantTime) {
+  public static WriteMarkers get(
+      MarkerType markerType, HoodieTableMetaClient metaClient, 
HoodieWriteConfig config,

Review comment:
       can't we get the witeConfig from metaClient itself? trying to see if we 
can avoid an extra argument.

##########
File path: 
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/client/FlinkTaskContextSupplier.java
##########
@@ -62,4 +64,9 @@ public RuntimeContext getFlinkRuntimeContext() {
     return Option.empty();
   }
 
+  @Override
+  public String getPartitionColumns(Properties props) {

Review comment:
       I do't think this belongs to the taskContextsupplier. This is very much 
product specific. Task Context supplier is meant to hold information about 
spark tasks and nothing more. Let's see if we can move elsewhere. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper class to assist in upgrading/downgrading Hoodie when there is a 
version change.
+ */
+public class UpgradeDowngrade {
+
+  private static final Logger LOG = 
LogManager.getLogger(UpgradeDowngrade.class);
+  public static final String HOODIE_UPDATED_PROPERTY_FILE = 
"hoodie.properties.updated";
+
+  private HoodieTableMetaClient metaClient;
+  protected HoodieWriteConfig config;
+  protected HoodieEngineContext context;
+  private transient FileSystem fs;
+  private Path updatedPropsFilePath;
+  private Path propsFilePath;
+
+  public UpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig 
config, HoodieEngineContext context) {
+    this.metaClient = metaClient;
+    this.config = config;
+    this.context = context;
+    this.fs = metaClient.getFs();
+    this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), 
HOODIE_UPDATED_PROPERTY_FILE);
+    this.propsFilePath = new Path(metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+  }
+
+  public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) {
+    HoodieTableVersion fromVersion = 
metaClient.getTableConfig().getTableVersion();
+    // Ensure no inflight commits & versions are same

Review comment:
       fix the comments

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/action/rollback/ListingBasedRollbackStrategy.java
##########
@@ -63,7 +63,8 @@ public ListingBasedRollbackStrategy(HoodieTable table,
             table.getMetaClient().getBasePath(), config);
       } else {
         rollbackRequests = RollbackUtils
-            .generateRollbackRequestsUsingFileListingMOR(instantToRollback, 
table, context);
+            .generateRollbackRequestsUsingFileListingMOR(

Review comment:
       there are too many arguments in there. Can we see if we can reduce them. 

##########
File path: 
hudi-client/hudi-client-common/src/main/java/org/apache/hudi/table/upgrade/UpgradeDowngrade.java
##########
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.table.upgrade;
+
+import org.apache.hudi.common.config.ConfigProperty;
+import org.apache.hudi.common.engine.HoodieEngineContext;
+import org.apache.hudi.common.table.HoodieTableConfig;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.HoodieTableVersion;
+import org.apache.hudi.common.util.FileIOUtils;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpgradeDowngradeException;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Helper class to assist in upgrading/downgrading Hoodie when there is a 
version change.
+ */
+public class UpgradeDowngrade {
+
+  private static final Logger LOG = 
LogManager.getLogger(UpgradeDowngrade.class);
+  public static final String HOODIE_UPDATED_PROPERTY_FILE = 
"hoodie.properties.updated";
+
+  private HoodieTableMetaClient metaClient;
+  protected HoodieWriteConfig config;
+  protected HoodieEngineContext context;
+  private transient FileSystem fs;
+  private Path updatedPropsFilePath;
+  private Path propsFilePath;
+
+  public UpgradeDowngrade(HoodieTableMetaClient metaClient, HoodieWriteConfig 
config, HoodieEngineContext context) {
+    this.metaClient = metaClient;
+    this.config = config;
+    this.context = context;
+    this.fs = metaClient.getFs();
+    this.updatedPropsFilePath = new Path(metaClient.getMetaPath(), 
HOODIE_UPDATED_PROPERTY_FILE);
+    this.propsFilePath = new Path(metaClient.getMetaPath(), 
HoodieTableConfig.HOODIE_PROPERTIES_FILE);
+  }
+
+  public boolean needsUpgradeOrDowngrade(HoodieTableVersion toVersion) {
+    HoodieTableVersion fromVersion = 
metaClient.getTableConfig().getTableVersion();
+    // Ensure no inflight commits & versions are same
+    return toVersion.versionCode() != fromVersion.versionCode();
+  }
+
+  /**
+   * Perform Upgrade or Downgrade steps if required and updated table version 
if need be.
+   * <p>
+   * Starting from version 0.6.0, this upgrade/downgrade step will be added in 
all write paths.
+   * <p>
+   * Essentially, if a dataset was created using any pre 0.6.0(for eg 0.5.3), 
and Hoodie version was upgraded to 0.6.0,
+   * Hoodie table version gets bumped to 1 and there are some upgrade steps 
need to be executed before doing any writes.
+   * Similarly, if a dataset was created using Hoodie version 0.6.0 or Hoodie 
table version 1 and then hoodie was downgraded
+   * to pre 0.6.0 or to Hoodie table version 0, then some downgrade steps need 
to be executed before proceeding w/ any writes.
+   * <p>
+   * On a high level, these are the steps performed
+   * <p>
+   * Step1 : Understand current hoodie table version and table version from 
hoodie.properties file
+   * Step2 : Delete any left over .updated from previous upgrade/downgrade
+   * Step3 : If version are different, perform upgrade/downgrade.
+   * Step4 : Copy hoodie.properties -> hoodie.properties.updated with the 
version updated
+   * Step6 : Rename hoodie.properties.updated to hoodie.properties
+   * </p>
+   *
+   * @param toVersion   version to which upgrade or downgrade has to be done.
+   * @param instantTime current instant time that should not be touched.
+   */
+  public void run(HoodieTableVersion toVersion, String instantTime) {
+    try {
+      // Fetch version from property file and current version
+      HoodieTableVersion fromVersion = 
metaClient.getTableConfig().getTableVersion();
+      if (!needsUpgradeOrDowngrade(toVersion)) {
+        return;
+      }
+
+      if (fs.exists(updatedPropsFilePath)) {
+        // this can be left over .updated file from a failed attempt before. 
Many cases exist here.
+        // a) We failed while writing the .updated file and it's content is 
partial (e.g hdfs)
+        // b) We failed without renaming the file to hoodie.properties. We 
will re-attempt everything now anyway
+        // c) rename() is not atomic in cloud stores. so hoodie.properties is 
fine, but we failed before deleting the .updated file
+        // All cases, it simply suffices to delete the file and proceed.
+        LOG.info("Deleting existing .updated file with content :" + 
FileIOUtils.readAsUTFString(fs.open(updatedPropsFilePath)));
+        fs.delete(updatedPropsFilePath, false);
+      }
+
+      // Perform the actual upgrade/downgrade; this has to be idempotent, for 
now.
+      LOG.info("Attempting to move table from version " + fromVersion + " to " 
+ toVersion);
+      Map<ConfigProperty, String> tableProps = new HashMap<>();
+      if (fromVersion.versionCode() < toVersion.versionCode()) {
+        // upgrade
+        while (fromVersion.versionCode() < toVersion.versionCode()) {
+          HoodieTableVersion nextVersion = 
HoodieTableVersion.versionFromCode(fromVersion.versionCode() + 1);
+          tableProps.putAll(upgrade(fromVersion, nextVersion, instantTime));
+          fromVersion = nextVersion;
+        }
+      } else {
+        // downgrade
+        while (fromVersion.versionCode() > toVersion.versionCode()) {
+          HoodieTableVersion prevVersion = 
HoodieTableVersion.versionFromCode(fromVersion.versionCode() - 1);
+          tableProps.putAll(downgrade(fromVersion, prevVersion, instantTime));
+          fromVersion = prevVersion;
+        }
+      }
+
+      // Write out the current version in hoodie.properties.updated file
+      for (Map.Entry<ConfigProperty, String> entry : tableProps.entrySet()) {
+        metaClient.getTableConfig().setValue(entry.getKey(), entry.getValue());
+      }
+      metaClient.getTableConfig().setTableVersion(toVersion);
+      createUpdatedFile(metaClient.getTableConfig().getProps());
+
+      // because for different fs the fs.rename have different action,such as:
+      // a) for hdfs : if propsFilePath already exist,fs.rename will not 
replace propsFilePath, but just return false
+      // b) for localfs: if propsFilePath already exist,fs.rename will replace 
propsFilePath, and return ture
+      // c) for aliyun ossfs: if propsFilePath already exist,will throw 
FileAlreadyExistsException
+      // so we should delete the old propsFilePath. also upgrade and downgrade 
is Idempotent
+      if (fs.exists(propsFilePath)) {
+        fs.delete(propsFilePath, false);
+      }
+      // Rename the .updated file to hoodie.properties. This is atomic in 
hdfs, but not in cloud stores.
+      // But as long as this does not leave a partial hoodie.properties file, 
we are okay.
+      fs.rename(updatedPropsFilePath, propsFilePath);
+    } catch (IOException e) {
+      throw new HoodieUpgradeDowngradeException("Error during 
upgrade/downgrade to version:" + toVersion, e);
+    }
+  }
+
+  private void createUpdatedFile(Properties props) throws IOException {
+    try (FSDataOutputStream outputStream = fs.create(updatedPropsFilePath)) {
+      props.store(outputStream, "Properties saved on " + new 
Date(System.currentTimeMillis()));
+    }
+  }
+
+  protected Map<ConfigProperty, String> upgrade(HoodieTableVersion 
fromVersion, HoodieTableVersion toVersion, String instantTime) {
+    if (fromVersion == HoodieTableVersion.ZERO && toVersion == 
HoodieTableVersion.ONE) {
+      return new ZeroToOneUpgradeHandler().upgrade(metaClient, config, 
context, instantTime);

Review comment:
       similar comment as above. can we avoid sending the wirteConfig as an 
extra argument. since we are refactoring this, wanted to take this opportunity 
to fix these.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to