xushiyan commented on code in PR #5064:
URL: https://github.com/apache/hudi/pull/5064#discussion_r1037912163


##########
hudi-platform-service/hudi-metaserver/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClient.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metaserver.client;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metaserver.thrift.Table;
+
+import java.util.List;
+
+/**
+ * Hoodie meta server client, is to get/put instants, instant meta, snapshot 
from/to hoodie meta server.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieMetaserverClient {
+
+  Table getTable(String db, String tb);
+
+  void createTable(Table table);
+
+  List<HoodieInstant> listInstants(String db, String tb, int commitNum);
+
+  Option<byte[]> getInstantMeta(String db, String tb, HoodieInstant instant);

Review Comment:
   why not just return byte[] ? a bit redundant to wrap as we can just return 
empty array. also the `instantMetadata` is more accurate than `instantMeta`



##########
hudi-platform-service/hudi-metaserver/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientImp.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metaserver.client;
+
+import org.apache.hudi.common.config.HoodieMetaserverConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metaserver.HoodieMetaserver;
+import org.apache.hudi.metaserver.thrift.Table;
+import org.apache.hudi.metaserver.thrift.ThriftHoodieMetaserver;
+import org.apache.hudi.metaserver.util.EntityConversions;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * HoodieMetaserverClientImp based on thrift.
+ */
+public class HoodieMetaserverClientImp implements HoodieMetaserverClient, 
AutoCloseable, Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetaserverClientImp.class);
+  private final HoodieMetaserverConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetaserver.Iface client;
+  private TTransport transport;
+
+  public HoodieMetaserverClientImp(HoodieMetaserverConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetaserverUris();
+    if (isLocalEmbeddedMetaserver(uri)) {
+      this.client = HoodieMetaserver.getEmbeddedMetaserver();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();
+    }
+  }
+
+  private void open() {
+    String uri = config.getMetaserverUris();
+    TTransportException exception = null;
+    for (int i = 0; !isConnected && i < retryLimit; i++) {
+      try {
+        URI msUri = URI.create(uri);
+        this.transport = new TSocket(msUri.getHost(), msUri.getPort());
+        this.client = new ThriftHoodieMetaserver.Client(new 
TBinaryProtocol(transport));
+        transport.open();
+        this.isConnected = true;
+        LOG.info("Connected to meta server: " + msUri);
+      } catch (TTransportException e) {
+        exception = e;
+        LOG.warn("Failed to connect to the meta server.", e);
+      }
+    }
+    if (!isConnected) {
+      throw new HoodieException("Fail to connect to the meta server.", 
exception);
+    }
+  }
+
+  private boolean isLocalEmbeddedMetaserver(String uri) {
+    return uri == null || uri.trim().isEmpty();
+  }
+
+  @Override
+  public Table getTable(String db, String tb) {
+    return exceptionWrapper(() -> this.client.getTable(db, tb)).get();
+  }
+
+  @Override
+  public void createTable(Table table) {
+    try {
+      this.client.createTable(table);
+    } catch (TException e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  @Override
+  public List<HoodieInstant> listInstants(String db, String tb, int commitNum) 
{
+    return exceptionWrapper(() -> this.client.listInstants(db, tb, 
commitNum).stream()
+        .map(EntityConversions::fromTHoodieInstant)
+        .collect(Collectors.toList())).get();
+  }
+
+  @Override
+  public Option<byte[]> getInstantMeta(String db, String tb, HoodieInstant 
instant) {
+    ByteBuffer byteBuffer = exceptionWrapper(() -> 
this.client.getInstantMeta(db, tb, 
EntityConversions.toTHoodieInstant(instant))).get();
+    byte[] bytes = new byte[byteBuffer.remaining()];
+    byteBuffer.get(bytes);
+    return byteBuffer.hasRemaining() ? Option.empty() : Option.of(bytes);
+  }
+
+  @Override
+  public String createNewTimestamp(String db, String tb) {
+    return exceptionWrapper(() -> this.client.createNewInstantTime(db, 
tb)).get();
+  }
+
+  @Override
+  public void createNewInstant(String db, String tb, HoodieInstant instant, 
Option<byte[]> content) {

Review Comment:
   again here maybe just `byte[]` makes API cleaner



##########
hudi-platform-service/hudi-metaserver/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientProxy.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metaserver.client;
+
+import org.apache.hudi.common.config.HoodieMetaserverConfig;
+
+import java.io.Serializable;
+import java.lang.reflect.InvocationHandler;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Proxy;
+import java.lang.reflect.UndeclaredThrowableException;
+
+/**
+ * AOP for meta server client.
+ */
+public class HoodieMetaserverClientProxy implements InvocationHandler, 
Serializable {
+
+  private final HoodieMetaserverClient client;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+
+  private HoodieMetaserverClientProxy(HoodieMetaserverConfig config) {
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    this.client = new HoodieMetaserverClientImp(config);
+  }
+
+  public static HoodieMetaserverClient getProxy(HoodieMetaserverConfig config) 
{
+    HoodieMetaserverClientProxy handler = new 
HoodieMetaserverClientProxy(config);
+    return (HoodieMetaserverClient) 
Proxy.newProxyInstance(HoodieMetaserverClientProxy.class.getClassLoader(),
+        new Class[]{HoodieMetaserverClient.class}, handler);
+  }
+
+  @Override
+  public Object invoke(Object proxy, Method method, Object[] args) throws 
Throwable {
+    int retry = 0;
+    Throwable err;
+    do {
+      try {
+        return method.invoke(client, args);
+      } catch (IllegalAccessException | InvocationTargetException | 
UndeclaredThrowableException e) {
+        throw e.getCause();
+      } catch (Throwable e) {
+        err = e;
+      }
+      retry++;
+      if (retry >= retryLimit) {
+        throw err;
+      }
+      Thread.sleep(retryDelaySeconds * 1000L);
+    } while (true);

Review Comment:
   see how can this be replaced with `org.apache.hudi.common.util.RetryHelper`



##########
hudi-platform-service/hudi-metaserver/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClientImp.java:
##########
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metaserver.client;
+
+import org.apache.hudi.common.config.HoodieMetaserverConfig;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.metaserver.HoodieMetaserver;
+import org.apache.hudi.metaserver.thrift.Table;
+import org.apache.hudi.metaserver.thrift.ThriftHoodieMetaserver;
+import org.apache.hudi.metaserver.util.EntityConversions;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.thrift.TException;
+import org.apache.thrift.protocol.TBinaryProtocol;
+import org.apache.thrift.transport.TSocket;
+import org.apache.thrift.transport.TTransport;
+import org.apache.thrift.transport.TTransportException;
+
+import java.io.Serializable;
+import java.net.URI;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.function.Supplier;
+import java.util.stream.Collectors;
+
+/**
+ * HoodieMetaserverClientImp based on thrift.
+ */
+public class HoodieMetaserverClientImp implements HoodieMetaserverClient, 
AutoCloseable, Serializable {
+
+  private static final Logger LOG =  
LogManager.getLogger(HoodieMetaserverClientImp.class);
+  private final HoodieMetaserverConfig config;
+  private final int retryLimit;
+  private final int retryDelaySeconds;
+  private boolean isConnected;
+  private boolean isLocal;
+  private ThriftHoodieMetaserver.Iface client;
+  private TTransport transport;
+
+  public HoodieMetaserverClientImp(HoodieMetaserverConfig config) {
+    this.config = config;
+    this.retryLimit = config.getConnectionRetryLimit();
+    this.retryDelaySeconds = config.getConnectionRetryDelay();
+    String uri = config.getMetaserverUris();
+    if (isLocalEmbeddedMetaserver(uri)) {
+      this.client = HoodieMetaserver.getEmbeddedMetaserver();
+      this.isConnected = true;
+      this.isLocal = true;
+    } else {
+      open();

Review Comment:
   if avoid `open()` in constructor, you can make `open()` 1 of the public API, 
then you can make the retry logic in proxy reused. It's better to clearly 
separate this: retry only resides in proxy



##########
hudi-platform-service/hudi-metaserver/src/main/thrift/hudi-metaserver.thrift:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ namespace java org.apache.hudi.metaserver.thrift
+
+ // table related
+ struct Table {
+   1: string tableName,
+   2: string dbName,

Review Comment:
   let's use full name `databaseName` for consistency



##########
hudi-platform-service/hudi-metaserver/src/main/java/org/apache/hudi/metaserver/util/EntityConversions.java:
##########
@@ -0,0 +1,116 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metaserver.util;
+
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.metaserver.thrift.TAction;
+import org.apache.hudi.metaserver.thrift.THoodieInstant;
+import org.apache.hudi.metaserver.thrift.TState;
+
+import java.util.Locale;
+
+/**
+ * Conversion helpers to convert between hoodie entity and thrift entity.
+ */
+public class EntityConversions {
+
+  public static THoodieInstant toTHoodieInstant(HoodieInstant instant) {
+    return new THoodieInstant(instant.getTimestamp(), 
toTAction(instant.getAction()), toTState(instant.getState()));
+  }
+
+  public static HoodieInstant fromTHoodieInstant(THoodieInstant instant) {
+    return new HoodieInstant(fromTState(instant.getState()), 
fromTAction(instant.getAction()), instant.getTimestamp());
+  }
+
+  public static TAction toTAction(String action) {
+    if (action == null) {
+      return null;
+    }

Review Comment:
   normally we do not expect input arg to be null. any scenarios we may 
encounter null input here? that may imply some caller's design problem



##########
hudi-platform-service/hudi-metaserver/src/main/java/org/apache/hudi/metaserver/client/HoodieMetaserverClient.java:
##########
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.metaserver.client;
+
+import org.apache.hudi.ApiMaturityLevel;
+import org.apache.hudi.PublicAPIClass;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.metaserver.thrift.Table;
+
+import java.util.List;
+
+/**
+ * Hoodie meta server client, is to get/put instants, instant meta, snapshot 
from/to hoodie meta server.
+ */
+@PublicAPIClass(maturity = ApiMaturityLevel.EVOLVING)
+public interface HoodieMetaserverClient {

Review Comment:
   a "Client" is in general "AutoCloseable"



##########
hudi-platform-service/hudi-metaserver/src/main/thrift/hudi-metaserver.thrift:
##########
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+ namespace java org.apache.hudi.metaserver.thrift
+
+ // table related
+ struct Table {
+   1: string tableName,
+   2: string dbName,
+   3: string owner,
+   4: i32 createTime,

Review Comment:
   in TableBean this is string?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to