ayushtkn commented on code in PR #427: URL: https://github.com/apache/tez/pull/427#discussion_r2609634048
########## tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryClient.java: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client-side interface for discovering Application Master (AM) instances + * registered in the AM registry. + * + * <p>Implementations are responsible for locating AM endpoints and returning + * their metadata. This API is used by client components to discover running + * Tez AMs.</p> + * + * <p>Listeners may be registered to receive notifications when AM records + * appear or are removed.</p> + */ +public abstract class AMRegistryClient implements Closeable { Review Comment: can we do ``implements AutoCloseable``? ########## tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java: ########## @@ -125,9 +154,11 @@ public boolean equals(Object other) { } if (other instanceof AMRecord otherRecord) { return appId.equals(otherRecord.appId) - && host.equals(otherRecord.host) + && hostName.equals(otherRecord.hostName) Review Comment: can this lead to NPE, we don't have any default here ``` this.hostName = serviceRecord.get(HOST_NAME_RECORD_KEY); ``` ########## tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkFrameworkClient.java: ########## @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry.zookeeper; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; +import org.apache.tez.client.FrameworkClient; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.dag.api.TezConfiguration; + +import com.google.common.annotations.VisibleForTesting; + +public class ZkFrameworkClient extends FrameworkClient { + + private AMRecord amRecord; + private ZkAMRegistryClient amRegistryClient = null; + private volatile boolean isRunning = false; + private String amHost; + private int amPort; + + @Override + public synchronized void init(TezConfiguration tezConf) { + if (amRegistryClient == null) { + try { + amRegistryClient = ZkAMRegistryClient.getClient(tezConf); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } + + @Override + public void start() { + try { + amRegistryClient.start(); Review Comment: we should return if `isRunning` is ` true` here ########## tez-api/src/main/java/org/apache/tez/client/registry/AMRecord.java: ########## @@ -148,16 +179,27 @@ public boolean equals(Object other) { * @return a {@link ServiceRecord} populated with the values of this {@code AMRecord} */ public ServiceRecord toServiceRecord() { - ServiceRecord serviceRecord = new ServiceRecord(); + if (serviceRecord != null) { + return serviceRecord; + } + serviceRecord = new ServiceRecord(); serviceRecord.set(APP_ID_RECORD_KEY, appId); - serviceRecord.set(HOST_RECORD_KEY, host); + serviceRecord.set(HOST_NAME_RECORD_KEY, hostName); + serviceRecord.set(HOST_IP_RECORD_KEY, hostIp); serviceRecord.set(PORT_RECORD_KEY, port); - serviceRecord.set(OPAQUE_ID_KEY, id); + serviceRecord.set(EXTERNAL_ID_KEY, externalId); + serviceRecord.set(COMPUTE_GROUP_NAME_KEY, computeName); + return serviceRecord; } + @Override + public String toString() { + return toServiceRecord().attributes().toString(); Review Comment: `toString` calls `toServiceRecord`, So, it can lead to initializing the ``serviceRecord = new ServiceRecord();``, are we ok with it. This could lead to issues later, like someone logged it or so, before setting the values and then we have the service record cached as well ########## tez-api/src/main/java/org/apache/tez/client/registry/AMRegistry.java: ########## @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry; + + +import org.apache.hadoop.yarn.api.records.ApplicationId; + + +/** + * Base class for {@code AMRegistry} implementations. + * + * <p>The specific implementation is configured via the + * {@code tez.am.registry.class} property.</p> + * + * <p>Implementations are expected to provide appropriate service lifecycle + * behavior, including: + * <ul> + * <li>{@code init}</li> + * <li>{@code serviceStart}</li> + * <li>{@code serviceStop}</li> + * </ul> + * </p> + */ +public interface AMRegistry extends AutoCloseable { + + void add(AMRecord server) throws Exception; + + void remove(AMRecord server) throws Exception; Review Comment: should it be `server` or `record` ########## tez-api/src/test/java/org/apache/tez/client/TestTezClient.java: ########## @@ -447,6 +457,97 @@ public TezClientForTest testTezClient(boolean isSession, boolean shouldStop, Str return client; } + public void testTezClientReconnect(boolean isSession) throws Exception { + //Setup 1 + Map<String, LocalResource> lrs = Maps.newHashMap(); + String lrName1 = "LR1"; + lrs.put(lrName1, LocalResource.newInstance(URL.newInstance("file", "localhost", 0, "/test"), + LocalResourceType.FILE, LocalResourceVisibility.PUBLIC, 1, 1)); + + //Client 1 + TezClientForTest client = configureAndCreateTezClient(lrs, isSession, null); + + //Submission Context 1 + ArgumentCaptor<ApplicationSubmissionContext> captor = ArgumentCaptor.forClass(ApplicationSubmissionContext.class); + when(client.mockYarnClient.getApplicationReport(client.mockAppId).getYarnApplicationState()) + .thenReturn(YarnApplicationState.RUNNING); + + //Client 1 start + client.start(); Review Comment: missing close? ########## tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryClient.java: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client-side interface for discovering Application Master (AM) instances + * registered in the AM registry. + * + * <p>Implementations are responsible for locating AM endpoints and returning + * their metadata. This API is used by client components to discover running + * Tez AMs.</p> + * + * <p>Listeners may be registered to receive notifications when AM records + * appear or are removed.</p> + */ +public abstract class AMRegistryClient implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(AMRegistryClient.class); + + private final List<AMRegistryClientListener> listeners = new ArrayList<>(); + + /** + * Lookup AM metadata for the given application ID. + * + * @param appId the application ID + * @return the AM record if found, otherwise {@code null} + * @throws IOException if the lookup fails + */ + public abstract AMRecord getRecord(ApplicationId appId) throws IOException; + + /** + * Retrieve all AM records known in the registry. + * + * @return a list of AM records (possibly empty) + * @throws IOException if the fetch fails + */ + public abstract List<AMRecord> getAllRecords() throws IOException; + + /** + * Register a listener for AM registry events. + * The listener will be notified when AM records are added or removed. + * + * @param listener the listener to add + */ + public synchronized void addListener(AMRegistryClientListener listener) { + listeners.add(listener); + } + + /** + * Notify listeners of a newly added AM record. + * + * @param record the added AM record + */ + protected synchronized void notifyOnAdded(AMRecord record) { + for (AMRegistryClientListener listener : listeners) { + try { + listener.onAdd(record); + } catch (Exception e) { + LOG.warn("Exception while calling AM add listener, AM record {}", record, e); + } + } + } + + /** + * Notify listeners of an updated AM record. + * + * @param record the updated AM record + */ + protected synchronized void notifyOnUpdated(AMRecord record) { + for (AMRegistryClientListener listener : listeners) { + try { + listener.onUpdate(record); + } catch (Exception e) { + LOG.warn("Exception while calling AM update listener, AM record {}", record, e); Review Comment: nit can we ``` "Exception while notifying AM update listener for record {}" ``` ########## tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java: ########## @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry.zookeeper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.client.registry.AMRegistryClient; +import org.apache.tez.client.registry.AMRegistryUtils; +import org.apache.tez.dag.api.TezConfiguration; + +import com.fasterxml.jackson.core.JsonParseException; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Curator/Zookeeper implementation of {@link AMRegistryClient}. + */ [email protected] +public final class ZkAMRegistryClient extends AMRegistryClient { + private static final Logger LOG = LoggerFactory.getLogger(ZkAMRegistryClient.class); + private static final Map<String, ZkAMRegistryClient> INSTANCES = new HashMap<>(); + + private final Configuration conf; + // Cache of known AMs + private final ConcurrentHashMap<ApplicationId, AMRecord> amRecordCache = new ConcurrentHashMap<>(); + + private CuratorFramework client; + private TreeCache cache; + private ZkRegistryListener listener; + + private ZkAMRegistryClient(final Configuration conf) { + this.conf = conf; + } + + public static synchronized ZkAMRegistryClient getClient(final Configuration conf) { + String namespace = conf.get(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE); + ZkAMRegistryClient registry = INSTANCES.get(namespace); + if (registry == null) { + registry = new ZkAMRegistryClient(conf); + INSTANCES.put(namespace, registry); + } + LOG.info("Returning tez AM registry ({}) for namespace '{}'", System.identityHashCode(registry), namespace); + return registry; + } + + /** + * Deserializes a {@link ServiceRecord} from ZooKeeper data and converts it into an {@link AMRecord} + * for caching. + * + * @param childData the ZooKeeper node data containing a serialized {@link ServiceRecord} + * @return an {@link AMRecord} constructed from the deserialized {@link ServiceRecord}, or {@code null} + * if no data is present + * @throws IOException if the data cannot be deserialized into a {@link ServiceRecord} + */ + public static AMRecord getAMRecord(final ChildData childData) throws IOException { + // Not a leaf path. Only leaf path contains AMRecord. + if (!childData.getPath().contains(ApplicationId.appIdStrPrefix)) { + return null; + } + byte[] data = childData.getData(); + // only the path appeared, there is no data yet + if (data.length == 0) { + return null; + } + String value = new String(data, StandardCharsets.UTF_8); + try { + return AMRegistryUtils.jsonStringToRecord(value); + } catch (JsonParseException e) { + //Not a json AMRecord (SRV), could be some other data. + LOG.warn("Non-json data received while de-serializing AMRecord: {}. Ignoring...", value); + return null; + } + } + + public void start() throws Exception { + ZkConfig zkConf = new ZkConfig(this.conf); + client = zkConf.createCuratorFramework(); + cache = new TreeCache(client, zkConf.getZkNamespace()); + client.start(); + cache.start(); Review Comment: shouldn't this be ``` client.start(); client.blockUntilConnected(); cache.start(); ``` I think cache.start() normally expects the client to already be started and with an active connection. ########## tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkFrameworkClient.java: ########## @@ -0,0 +1,170 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry.zookeeper; + +import java.io.IOException; + +import org.apache.hadoop.yarn.api.protocolrecords.GetNewApplicationResponse; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.api.records.FinalApplicationStatus; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.client.api.YarnClientApplication; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.util.Records; +import org.apache.tez.client.FrameworkClient; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.dag.api.TezConfiguration; + +import com.google.common.annotations.VisibleForTesting; + +public class ZkFrameworkClient extends FrameworkClient { + + private AMRecord amRecord; Review Comment: should this be `volatile` ########## tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java: ########## @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry.zookeeper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.client.registry.AMRegistryClient; +import org.apache.tez.client.registry.AMRegistryUtils; +import org.apache.tez.dag.api.TezConfiguration; + +import com.fasterxml.jackson.core.JsonParseException; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Curator/Zookeeper implementation of {@link AMRegistryClient}. + */ [email protected] +public final class ZkAMRegistryClient extends AMRegistryClient { + private static final Logger LOG = LoggerFactory.getLogger(ZkAMRegistryClient.class); + private static final Map<String, ZkAMRegistryClient> INSTANCES = new HashMap<>(); + + private final Configuration conf; + // Cache of known AMs + private final ConcurrentHashMap<ApplicationId, AMRecord> amRecordCache = new ConcurrentHashMap<>(); + + private CuratorFramework client; + private TreeCache cache; + private ZkRegistryListener listener; + + private ZkAMRegistryClient(final Configuration conf) { + this.conf = conf; + } + + public static synchronized ZkAMRegistryClient getClient(final Configuration conf) { + String namespace = conf.get(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE); + ZkAMRegistryClient registry = INSTANCES.get(namespace); + if (registry == null) { + registry = new ZkAMRegistryClient(conf); + INSTANCES.put(namespace, registry); + } + LOG.info("Returning tez AM registry ({}) for namespace '{}'", System.identityHashCode(registry), namespace); + return registry; + } + + /** + * Deserializes a {@link ServiceRecord} from ZooKeeper data and converts it into an {@link AMRecord} + * for caching. + * + * @param childData the ZooKeeper node data containing a serialized {@link ServiceRecord} + * @return an {@link AMRecord} constructed from the deserialized {@link ServiceRecord}, or {@code null} + * if no data is present + * @throws IOException if the data cannot be deserialized into a {@link ServiceRecord} + */ + public static AMRecord getAMRecord(final ChildData childData) throws IOException { + // Not a leaf path. Only leaf path contains AMRecord. + if (!childData.getPath().contains(ApplicationId.appIdStrPrefix)) { + return null; + } + byte[] data = childData.getData(); + // only the path appeared, there is no data yet + if (data.length == 0) { + return null; + } + String value = new String(data, StandardCharsets.UTF_8); + try { + return AMRegistryUtils.jsonStringToRecord(value); + } catch (JsonParseException e) { + //Not a json AMRecord (SRV), could be some other data. + LOG.warn("Non-json data received while de-serializing AMRecord: {}. Ignoring...", value); + return null; + } + } + + public void start() throws Exception { + ZkConfig zkConf = new ZkConfig(this.conf); + client = zkConf.createCuratorFramework(); + cache = new TreeCache(client, zkConf.getZkNamespace()); + client.start(); + cache.start(); + listener = new ZkRegistryListener(); + cache.getListenable().addListener(listener); + } + + @Override + public AMRecord getRecord(ApplicationId appId) { + AMRecord rec = amRecordCache.get(appId); + // Return a copy. + return rec == null ? null : new AMRecord(rec); + } + + @Override + public List<AMRecord> getAllRecords() { + return amRecordCache.values().stream().map(AMRecord::new).collect(Collectors.toList()); + } + + @Override + public void close() { + IOUtils.closeQuietly(cache); + IOUtils.closeQuietly(client); + } + + public boolean isInitialized() { + return listener.initialized; + } + + /** + * Callback listener for ZooKeeper events that updates the local cache + * when child nodes under the monitored path change. + */ + private class ZkRegistryListener implements TreeCacheListener { + + private boolean initialized = false; + + @Override + public void childEvent(final CuratorFramework clientParam, final TreeCacheEvent event) throws Exception { + Preconditions.checkArgument(clientParam != null && clientParam.getState() == CuratorFrameworkState.STARTED, + "Curator client is not started"); + + ChildData childData = event.getData(); + switch (event.getType()) { + case NODE_ADDED: + if (isEmpty(childData)) { + LOG.info("AppId allocated: {}", childData.getPath()); + } else { + AMRecord amRecord = getAMRecord(childData); + if (amRecord != null) { + LOG.info("AM registered with data: {}. Notifying listeners.", amRecord); + amRecordCache.put(amRecord.getApplicationId(), amRecord); + notifyOnAdded(amRecord); + } + } + break; + case NODE_UPDATED: + if (isEmpty(childData)) { + throw new RuntimeException("AM updated with empty data"); + } else { + AMRecord amRecord = getAMRecord(childData); + if (amRecord != null) { + LOG.info("AM updated data: {}. Notifying listeners.", amRecord); + amRecordCache.put(amRecord.getApplicationId(), amRecord); + notifyOnAdded(amRecord); Review Comment: shouldn't this call ` notifyOnUpdated(amRecord);` instead? ########## tez-api/src/main/java/org/apache/tez/client/registry/AMRegistry.java: ########## @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry; + + +import org.apache.hadoop.yarn.api.records.ApplicationId; + + +/** + * Base class for {@code AMRegistry} implementations. + * + * <p>The specific implementation is configured via the + * {@code tez.am.registry.class} property.</p> + * + * <p>Implementations are expected to provide appropriate service lifecycle + * behavior, including: + * <ul> + * <li>{@code init}</li> + * <li>{@code serviceStart}</li> + * <li>{@code serviceStop}</li> + * </ul> + * </p> + */ +public interface AMRegistry extends AutoCloseable { + + void add(AMRecord server) throws Exception; + + void remove(AMRecord server) throws Exception; + + ApplicationId generateNewId() throws Exception; + + AMRecord createAmRecord(ApplicationId appId, String hostName, String hostIp, int port, + String computeName); + + void close(); Review Comment: `AutoCloseable` already defines `close`, do we need to define again here? ########## tez-api/src/main/java/org/apache/tez/frameworkplugins/FrameworkUtils.java: ########## @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.frameworkplugins; + + +import javax.annotation.Nullable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezReflectionException; + +public final class FrameworkUtils { + + private static final String SERVER_FRAMEWORK_SERVICE_INTERFACE_NAME = + "org.apache.tez.frameworkplugins.ServerFrameworkService"; + + private FrameworkUtils() {} + + /* + Searches for a FrameworkService provider which implements a target interface. + The interface should be either ClientFrameworkService or ServerFrameworkService. + Depending on which interface is used, either the client or server class of a + matching FrameworkMode will be used as the implementation. + + NOTE: Layering of FrameworkServices in a decorator-style is currently not supported + + An implementation is searched in the following order: + 1. If conf is not null and the parameter TEZ_FRAMEWORK_MODE is set: + the value of TEZ_FRAMEWORK_MODE from the conf will be used + 2. If conf is null or the parameter TEZ_FRAMEWORK_MODE is not set + and the environment var TEZ_FRAMEWORK_MODE is not empty: + the value of the environment var will be used + 3. Otherwise: the default class will be instantiated and returned + */ + public static <T extends FrameworkService> T get(Class<T> interfaze, @Nullable Configuration conf, + Class<?> defaultClazz) { + String modeInConf = conf != null ? conf.get(TezConfiguration.TEZ_FRAMEWORK_MODE) : null; + String modeInEnv = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); + try { + if (modeInConf != null) { + return getByMode(interfaze, modeInConf); + } else if (modeInEnv != null) { + return getByMode(interfaze, modeInEnv); + } else if (defaultClazz != null) { + return (T) defaultClazz.newInstance(); Review Comment: I think this is deprecated, does this work ``` defaultClazz.getDeclaredConstructor().newInstance(); ``` ########## tez-api/src/main/java/org/apache/tez/client/registry/AMRegistryClient.java: ########## @@ -0,0 +1,118 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry; + +import java.io.Closeable; +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.yarn.api.records.ApplicationId; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Client-side interface for discovering Application Master (AM) instances + * registered in the AM registry. + * + * <p>Implementations are responsible for locating AM endpoints and returning + * their metadata. This API is used by client components to discover running + * Tez AMs.</p> + * + * <p>Listeners may be registered to receive notifications when AM records + * appear or are removed.</p> + */ +public abstract class AMRegistryClient implements Closeable { + private static final Logger LOG = LoggerFactory.getLogger(AMRegistryClient.class); + + private final List<AMRegistryClientListener> listeners = new ArrayList<>(); + + /** + * Lookup AM metadata for the given application ID. + * + * @param appId the application ID + * @return the AM record if found, otherwise {@code null} + * @throws IOException if the lookup fails + */ + public abstract AMRecord getRecord(ApplicationId appId) throws IOException; + + /** + * Retrieve all AM records known in the registry. + * + * @return a list of AM records (possibly empty) + * @throws IOException if the fetch fails + */ + public abstract List<AMRecord> getAllRecords() throws IOException; + + /** + * Register a listener for AM registry events. + * The listener will be notified when AM records are added or removed. + * + * @param listener the listener to add + */ + public synchronized void addListener(AMRegistryClientListener listener) { + listeners.add(listener); + } + Review Comment: why dont' we have `removeListener`? ########## tez-api/src/main/java/org/apache/tez/client/registry/zookeeper/ZkAMRegistryClient.java: ########## @@ -0,0 +1,209 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tez.client.registry.zookeeper; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.stream.Collectors; + +import org.apache.commons.io.IOUtils; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.imps.CuratorFrameworkState; +import org.apache.curator.framework.recipes.cache.ChildData; +import org.apache.curator.framework.recipes.cache.TreeCache; +import org.apache.curator.framework.recipes.cache.TreeCacheEvent; +import org.apache.curator.framework.recipes.cache.TreeCacheListener; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.tez.client.registry.AMRecord; +import org.apache.tez.client.registry.AMRegistryClient; +import org.apache.tez.client.registry.AMRegistryUtils; +import org.apache.tez.dag.api.TezConfiguration; + +import com.fasterxml.jackson.core.JsonParseException; +import com.google.common.base.Preconditions; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Curator/Zookeeper implementation of {@link AMRegistryClient}. + */ [email protected] +public final class ZkAMRegistryClient extends AMRegistryClient { + private static final Logger LOG = LoggerFactory.getLogger(ZkAMRegistryClient.class); + private static final Map<String, ZkAMRegistryClient> INSTANCES = new HashMap<>(); + + private final Configuration conf; + // Cache of known AMs + private final ConcurrentHashMap<ApplicationId, AMRecord> amRecordCache = new ConcurrentHashMap<>(); + + private CuratorFramework client; + private TreeCache cache; + private ZkRegistryListener listener; + + private ZkAMRegistryClient(final Configuration conf) { + this.conf = conf; + } + + public static synchronized ZkAMRegistryClient getClient(final Configuration conf) { + String namespace = conf.get(TezConfiguration.TEZ_AM_REGISTRY_NAMESPACE); + ZkAMRegistryClient registry = INSTANCES.get(namespace); + if (registry == null) { + registry = new ZkAMRegistryClient(conf); + INSTANCES.put(namespace, registry); + } + LOG.info("Returning tez AM registry ({}) for namespace '{}'", System.identityHashCode(registry), namespace); + return registry; + } + + /** + * Deserializes a {@link ServiceRecord} from ZooKeeper data and converts it into an {@link AMRecord} + * for caching. + * + * @param childData the ZooKeeper node data containing a serialized {@link ServiceRecord} + * @return an {@link AMRecord} constructed from the deserialized {@link ServiceRecord}, or {@code null} + * if no data is present + * @throws IOException if the data cannot be deserialized into a {@link ServiceRecord} + */ + public static AMRecord getAMRecord(final ChildData childData) throws IOException { + // Not a leaf path. Only leaf path contains AMRecord. + if (!childData.getPath().contains(ApplicationId.appIdStrPrefix)) { + return null; + } + byte[] data = childData.getData(); + // only the path appeared, there is no data yet + if (data.length == 0) { + return null; + } + String value = new String(data, StandardCharsets.UTF_8); + try { + return AMRegistryUtils.jsonStringToRecord(value); + } catch (JsonParseException e) { + //Not a json AMRecord (SRV), could be some other data. + LOG.warn("Non-json data received while de-serializing AMRecord: {}. Ignoring...", value); + return null; + } + } + + public void start() throws Exception { + ZkConfig zkConf = new ZkConfig(this.conf); + client = zkConf.createCuratorFramework(); + cache = new TreeCache(client, zkConf.getZkNamespace()); + client.start(); + cache.start(); + listener = new ZkRegistryListener(); + cache.getListenable().addListener(listener); + } + + @Override + public AMRecord getRecord(ApplicationId appId) { + AMRecord rec = amRecordCache.get(appId); + // Return a copy. + return rec == null ? null : new AMRecord(rec); + } + + @Override + public List<AMRecord> getAllRecords() { + return amRecordCache.values().stream().map(AMRecord::new).collect(Collectors.toList()); + } + + @Override + public void close() { + IOUtils.closeQuietly(cache); + IOUtils.closeQuietly(client); Review Comment: should we clean up ``INSTANCES`` here as well?. I am doubting a leak here ########## tez-api/src/main/java/org/apache/tez/frameworkplugins/FrameworkUtils.java: ########## @@ -0,0 +1,84 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tez.frameworkplugins; + + +import javax.annotation.Nullable; + +import org.apache.hadoop.conf.Configuration; +import org.apache.tez.common.ReflectionUtils; +import org.apache.tez.dag.api.TezConfiguration; +import org.apache.tez.dag.api.TezConstants; +import org.apache.tez.dag.api.TezReflectionException; + +public final class FrameworkUtils { + + private static final String SERVER_FRAMEWORK_SERVICE_INTERFACE_NAME = + "org.apache.tez.frameworkplugins.ServerFrameworkService"; + + private FrameworkUtils() {} + + /* + Searches for a FrameworkService provider which implements a target interface. + The interface should be either ClientFrameworkService or ServerFrameworkService. + Depending on which interface is used, either the client or server class of a + matching FrameworkMode will be used as the implementation. + + NOTE: Layering of FrameworkServices in a decorator-style is currently not supported + + An implementation is searched in the following order: + 1. If conf is not null and the parameter TEZ_FRAMEWORK_MODE is set: + the value of TEZ_FRAMEWORK_MODE from the conf will be used + 2. If conf is null or the parameter TEZ_FRAMEWORK_MODE is not set + and the environment var TEZ_FRAMEWORK_MODE is not empty: + the value of the environment var will be used + 3. Otherwise: the default class will be instantiated and returned + */ + public static <T extends FrameworkService> T get(Class<T> interfaze, @Nullable Configuration conf, + Class<?> defaultClazz) { + String modeInConf = conf != null ? conf.get(TezConfiguration.TEZ_FRAMEWORK_MODE) : null; + String modeInEnv = System.getenv(TezConstants.TEZ_FRAMEWORK_MODE); + try { + if (modeInConf != null) { Review Comment: should it be StringUtils.isNotEmpty()? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
