http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/SecureableZone.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/SecureableZone.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/SecureableZone.java new file mode 100644 index 0000000..4b0a852 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/SecureableZone.java @@ -0,0 +1,151 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.registry.server.dns; + +import org.xbill.DNS.DClass; +import org.xbill.DNS.NXTRecord; +import org.xbill.DNS.Name; +import org.xbill.DNS.RRset; +import org.xbill.DNS.Record; +import org.xbill.DNS.SetResponse; +import org.xbill.DNS.Type; +import org.xbill.DNS.Zone; +import org.xbill.DNS.ZoneTransferException; +import org.xbill.DNS.ZoneTransferIn; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.BitSet; +import java.util.Collections; +import java.util.Comparator; +import java.util.List; + +/** + * A zone implementation geared to support some DNSSEC functionality. + */ +public class SecureableZone extends Zone { + private List<Record> records; + + /** + * Creates a Zone by doing the specified zone transfer. + * @param xfrin The incoming zone transfer to execute. + * @throws IOException if there is an error. + * @throws ZoneTransferException if there is an error. + */ + public SecureableZone(ZoneTransferIn xfrin) + throws IOException, ZoneTransferException { + super(xfrin); + } + + /** + * Creates a Zone by performing a zone transfer to the specified host. + * @param zone zone name. + * @param dclass the dclass + * @param remote the remote host. + * @throws IOException if there is an error. + * @throws ZoneTransferException if there is an error. + */ + public SecureableZone(Name zone, int dclass, String remote) + throws IOException, ZoneTransferException { + super(zone, dclass, remote); + } + + /** + * Creates a Zone from the records in the specified master file. + * @param zone The name of the zone. + * @param file The master file to read from. + * @throws IOException if there is an error. + */ + public SecureableZone(Name zone, String file) throws IOException { + super(zone, file); + } + + /** + * Creates a Zone from an array of records. + * @param zone The name of the zone. + * @param records The records to add to the zone. + * @throws IOException if there is an error. + */ + public SecureableZone(Name zone, Record[] records) + throws IOException { + super(zone, records); + } + + /** + * Adds a Record to the Zone. + * @param r The record to be added + * @see Record + */ + @Override public void addRecord(Record r) { + if (records == null) { + records = new ArrayList<Record>(); + } + super.addRecord(r); + records.add(r); + } + + /** + * Removes a record from the Zone. + * @param r The record to be removed + * @see Record + */ + @Override public void removeRecord(Record r) { + if (records == null) { + records = new ArrayList<Record>(); + } + super.removeRecord(r); + records.remove(r); + } + + /** + * Return a NXT record appropriate for the query. + * @param queryRecord the query record. + * @param zone the zone to search. + * @return the NXT record describing the insertion point. + */ + @SuppressWarnings({"unchecked"}) + public Record getNXTRecord(Record queryRecord, Zone zone) { + Collections.sort(records); + + int index = Collections.binarySearch(records, queryRecord, + new Comparator<Record>() { + @Override public int compare(Record r1, Record r2) { + return r1.compareTo(r2); + } + }); + if (index >= 0) { + return null; + } + index = -index - 1; + if (index >= records.size()) { + index = records.size() - 1; + } + Record base = records.get(index); + SetResponse sr = zone.findRecords(base.getName(), Type.ANY); + BitSet bitMap = new BitSet(); + bitMap.set(Type.NXT); + RRset[] rRsets = sr.answers(); + for (RRset rRset : rRsets) { + int typeCode = rRset.getType(); + if (typeCode > 0 && typeCode < 128) { + bitMap.set(typeCode); + } + } + return new NXTRecord(base.getName(), DClass.IN, zone.getSOA().getMinimum(), + queryRecord.getName(), bitMap); + } +}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/ServiceRecordProcessor.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/ServiceRecordProcessor.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/ServiceRecordProcessor.java new file mode 100644 index 0000000..b67cc7d --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/ServiceRecordProcessor.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.registry.server.dns; + +import org.apache.hadoop.registry.client.types.ServiceRecord; + +import java.io.IOException; + +/** + * Manage the processing of service records in order to create DNS records. + */ +public interface ServiceRecordProcessor { + /** + * Initialize the mapping between DNS record type and record information + * for the given service record. + * @param serviceRecord the registry service record. + * @throws Exception if encountering an error. + */ + void initTypeToInfoMapping(ServiceRecord serviceRecord) + throws Exception; + + /** + * Return the DNS record types valid for this processor. + * @return the array of DNS record types. + */ + int[] getRecordTypes(); + + /** + * Manage the creation and registration of DNS records generated by parsing + * a service record. + * @param command the DNS registration command object (e.g. add_record, + * remove record) + * @throws IOException if the creation or registration generates an issue. + */ + void manageDNSRecords(RegistryDNS.RegistryCommand command) + throws IOException; + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/ZoneSelector.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/ZoneSelector.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/ZoneSelector.java new file mode 100644 index 0000000..5043b85 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/ZoneSelector.java @@ -0,0 +1,33 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.registry.server.dns; + +import org.xbill.DNS.Name; +import org.xbill.DNS.Zone; + +/** + * A selector that returns the zone associated with a provided name. + */ +public interface ZoneSelector { + /** + * Finds the best matching zone given the provided name. + * @param name the record name for which a zone is requested. + * @return the matching zone. + */ + Zone findBestZone(Name name); +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/package-info.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/package-info.java new file mode 100644 index 0000000..00d8c9db --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/dns/package-info.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * DNS Server classes. + * <p> + * These classes are leveraged to create a DNS server that can provide the + * facilities necessary for YARN application and/or service discovery. + * </p> + */ +package org.apache.hadoop.registry.server.dns; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java new file mode 100644 index 0000000..6a1993e --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/integration/SelectByYarnPersistence.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.integration; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.registry.client.types.yarn.YarnRegistryAttributes; +import org.apache.hadoop.registry.server.services.RegistryAdminService; + +/** + * Select an entry by the YARN persistence policy + */ +public class SelectByYarnPersistence + implements RegistryAdminService.NodeSelector { + private final String id; + private final String targetPolicy; + + public SelectByYarnPersistence(String id, String targetPolicy) { + Preconditions.checkArgument(!StringUtils.isEmpty(id), "id"); + Preconditions.checkArgument(!StringUtils.isEmpty(targetPolicy), + "targetPolicy"); + this.id = id; + this.targetPolicy = targetPolicy; + } + + @Override + public boolean shouldSelect(String path, + RegistryPathStatus registryPathStatus, + ServiceRecord serviceRecord) { + String policy = + serviceRecord.get(YarnRegistryAttributes.YARN_PERSISTENCE, ""); + return id.equals(serviceRecord.get(YarnRegistryAttributes.YARN_ID, "")) + && (targetPolicy.equals(policy)); + } + + @Override + public String toString() { + return String.format( + "Select by ID %s and policy %s: {}", + id, targetPolicy); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java new file mode 100644 index 0000000..22d8bc5 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/integration/package-info.java @@ -0,0 +1,23 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * This package contains the classes which integrate with the YARN resource + * manager. + */ +package org.apache.hadoop.registry.server.integration; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java new file mode 100644 index 0000000..6962eb8 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/package-info.java @@ -0,0 +1,27 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Server-side classes for the registry + * <p> + * These are components intended to be deployed only on servers or in test + * JVMs, rather than on client machines. + * <p> + * Example components are: server-side ZK support, a REST service, etc. + */ +package org.apache.hadoop.registry.server; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java new file mode 100644 index 0000000..9faede4 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/AddingCompositeService.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.service.CompositeService; +import org.apache.hadoop.service.Service; + +/** + * Composite service that exports the add/remove methods. + * <p> + * This allows external classes to add services to these methods, after which + * they follow the same lifecyce. + * <p> + * It is essential that any service added is in a state where it can be moved + * on with that of the parent services. Specifically, do not add an uninited + * service to a parent that is already inited âas the <code>start</code> + * operation will then fail + * + */ [email protected] [email protected] +public class AddingCompositeService extends CompositeService { + + + public AddingCompositeService(String name) { + super(name); + } + + @Override + public void addService(Service service) { + super.addService(service); + } + + @Override + public boolean removeService(Service service) { + return super.removeService(service); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java new file mode 100644 index 0000000..829ef68 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/DeleteCompletionCallback.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.curator.framework.api.CuratorEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Curator callback for delete operations completing. + * <p> + * This callback logs at debug and increments the event counter. + */ +public class DeleteCompletionCallback implements BackgroundCallback { + private static final Logger LOG = + LoggerFactory.getLogger(DeleteCompletionCallback.class); + + private AtomicInteger events = new AtomicInteger(0); + + @Override + public void processResult(CuratorFramework client, + CuratorEvent event) throws + Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Delete event {}", event); + } + events.incrementAndGet(); + } + + /** + * Get the number of deletion events + * @return the count of events + */ + public int getEventCount() { + return events.get(); + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java new file mode 100644 index 0000000..b6cf9fc --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperService.java @@ -0,0 +1,282 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + +import com.google.common.base.Preconditions; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.ensemble.fixed.FixedEnsembleProvider; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileUtil; +import org.apache.hadoop.service.AbstractService; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.client.impl.zk.BindingInformation; +import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource; +import org.apache.hadoop.registry.client.impl.zk.RegistryInternalConstants; +import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity; +import org.apache.hadoop.registry.client.impl.zk.ZookeeperConfigOptions; +import org.apache.zookeeper.server.ServerCnxnFactory; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.server.persistence.FileTxnSnapLog; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.io.PrintWriter; +import java.io.StringWriter; +import java.net.InetSocketAddress; +import java.net.UnknownHostException; + +/** + * This is a small, localhost Zookeeper service instance that is contained + * in a YARN service...it's been derived from Apache Twill. + * <p> + * It implements {@link RegistryBindingSource} and provides binding information, + * <i>once started</i>. Until {@link #start()} is called, the hostname and + * port may be undefined. Accordingly, the service raises an exception in this + * condition. + * <p> + * If you wish to chain together a registry service with this one under + * the same {@code CompositeService}, this service must be added + * as a child first. + * <p> + * It also sets the configuration parameter + * {@link RegistryConstants#KEY_REGISTRY_ZK_QUORUM} + * to its connection string. Any code with access to the service configuration + * can view it. + */ [email protected] +public class MicroZookeeperService + extends AbstractService + implements RegistryBindingSource, RegistryConstants, + ZookeeperConfigOptions, + MicroZookeeperServiceKeys{ + + + private static final Logger + LOG = LoggerFactory.getLogger(MicroZookeeperService.class); + + private File instanceDir; + private File dataDir; + private int tickTime; + private int port; + private String host; + private boolean secureServer; + + private ServerCnxnFactory factory; + private BindingInformation binding; + private File confDir; + private StringBuilder diagnostics = new StringBuilder(); + + /** + * Create an instance + * @param name service name + */ + public MicroZookeeperService(String name) { + super(name); + } + + /** + * Get the connection string. + * @return the string + * @throws IllegalStateException if the connection is not yet valid + */ + public String getConnectionString() { + Preconditions.checkState(factory != null, "service not started"); + InetSocketAddress addr = factory.getLocalAddress(); + return String.format("%s:%d", addr.getHostName(), addr.getPort()); + } + + /** + * Get the connection address + * @return the connection as an address + * @throws IllegalStateException if the connection is not yet valid + */ + public InetSocketAddress getConnectionAddress() { + Preconditions.checkState(factory != null, "service not started"); + return factory.getLocalAddress(); + } + + /** + * Create an inet socket addr from the local host + port number + * @param port port to use + * @return a (hostname, port) pair + * @throws UnknownHostException if the server cannot resolve the host + */ + private InetSocketAddress getAddress(int port) throws UnknownHostException { + return new InetSocketAddress(host, port < 0 ? 0 : port); + } + + /** + * Initialize the service, including choosing a path for the data + * @param conf configuration + * @throws Exception + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + port = conf.getInt(KEY_ZKSERVICE_PORT, 0); + tickTime = conf.getInt(KEY_ZKSERVICE_TICK_TIME, + ZooKeeperServer.DEFAULT_TICK_TIME); + String instancedirname = conf.getTrimmed( + KEY_ZKSERVICE_DIR, ""); + host = conf.getTrimmed(KEY_ZKSERVICE_HOST, DEFAULT_ZKSERVICE_HOST); + if (instancedirname.isEmpty()) { + File testdir = new File(System.getProperty("test.dir", "target")); + instanceDir = new File(testdir, "zookeeper" + getName()); + } else { + instanceDir = new File(instancedirname); + FileUtil.fullyDelete(instanceDir); + } + LOG.debug("Instance directory is {}", instanceDir); + mkdirStrict(instanceDir); + dataDir = new File(instanceDir, "data"); + confDir = new File(instanceDir, "conf"); + mkdirStrict(dataDir); + mkdirStrict(confDir); + super.serviceInit(conf); + } + + /** + * Create a directory, ignoring if the dir is already there, + * and failing if a file or something else was at the end of that + * path + * @param dir dir to guarantee the existence of + * @throws IOException IO problems, or path exists but is not a dir + */ + private void mkdirStrict(File dir) throws IOException { + if (!dir.mkdirs()) { + if (!dir.isDirectory()) { + throw new IOException("Failed to mkdir " + dir); + } + } + } + + /** + * Append a formatted string to the diagnostics. + * <p> + * A newline is appended afterwards. + * @param text text including any format commands + * @param args arguments for the forma operation. + */ + protected void addDiagnostics(String text, Object ... args) { + diagnostics.append(String.format(text, args)).append('\n'); + } + + /** + * Get the diagnostics info + * @return the diagnostics string built up + */ + public String getDiagnostics() { + return diagnostics.toString(); + } + + /** + * set up security. this must be done prior to creating + * the ZK instance, as it sets up JAAS if that has not been done already. + * + * @return true if the cluster has security enabled. + */ + public boolean setupSecurity() throws IOException { + Configuration conf = getConfig(); + String jaasContext = conf.getTrimmed(KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT); + secureServer = StringUtils.isNotEmpty(jaasContext); + if (secureServer) { + RegistrySecurity.validateContext(jaasContext); + RegistrySecurity.bindZKToServerJAASContext(jaasContext); + // policy on failed auth + System.setProperty(PROP_ZK_ALLOW_FAILED_SASL_CLIENTS, + conf.get(KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS, + "true")); + + //needed so that you can use sasl: strings in the registry + System.setProperty(RegistryInternalConstants.ZOOKEEPER_AUTH_PROVIDER +".1", + RegistryInternalConstants.SASLAUTHENTICATION_PROVIDER); + String serverContext = + System.getProperty(PROP_ZK_SERVER_SASL_CONTEXT); + addDiagnostics("Server JAAS context s = %s", serverContext); + return true; + } else { + return false; + } + } + + /** + * Startup: start ZK. It is only after this that + * the binding information is valid. + * @throws Exception + */ + @Override + protected void serviceStart() throws Exception { + + setupSecurity(); + + ZooKeeperServer zkServer = new ZooKeeperServer(); + FileTxnSnapLog ftxn = new FileTxnSnapLog(dataDir, dataDir); + zkServer.setTxnLogFactory(ftxn); + zkServer.setTickTime(tickTime); + + LOG.info("Starting Local Zookeeper service"); + factory = ServerCnxnFactory.createFactory(); + factory.configure(getAddress(port), -1); + factory.startup(zkServer); + + String connectString = getConnectionString(); + LOG.info("In memory ZK started at {}\n", connectString); + + if (LOG.isDebugEnabled()) { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + zkServer.dumpConf(pw); + pw.flush(); + LOG.debug(sw.toString()); + } + binding = new BindingInformation(); + binding.ensembleProvider = new FixedEnsembleProvider(connectString); + binding.description = + getName() + " reachable at \"" + connectString + "\""; + + addDiagnostics(binding.description); + // finally: set the binding information in the config + getConfig().set(KEY_REGISTRY_ZK_QUORUM, connectString); + } + + /** + * When the service is stopped, it deletes the data directory + * and its contents + * @throws Exception + */ + @Override + protected void serviceStop() throws Exception { + if (factory != null) { + factory.shutdown(); + factory = null; + } + if (dataDir != null) { + FileUtil.fullyDelete(dataDir); + } + } + + @Override + public BindingInformation supplyBindingInformation() { + Preconditions.checkNotNull(binding, + "Service is not started: binding information undefined"); + return binding; + } +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java new file mode 100644 index 0000000..f4f4976 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/MicroZookeeperServiceKeys.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + +import org.apache.hadoop.registry.client.api.RegistryConstants; + +/** + * Service keys for configuring the {@link MicroZookeeperService}. + * These are not used in registry clients or the RM-side service, + * so are kept separate. + */ +public interface MicroZookeeperServiceKeys { + public static final String ZKSERVICE_PREFIX = + RegistryConstants.REGISTRY_PREFIX + "zk.service."; + /** + * Key to define the JAAS context for the ZK service: {@value}. + */ + public static final String KEY_REGISTRY_ZKSERVICE_JAAS_CONTEXT = + ZKSERVICE_PREFIX + "service.jaas.context"; + + /** + * ZK servertick time: {@value} + */ + public static final String KEY_ZKSERVICE_TICK_TIME = + ZKSERVICE_PREFIX + "ticktime"; + + /** + * host to register on: {@value}. + */ + public static final String KEY_ZKSERVICE_HOST = ZKSERVICE_PREFIX + "host"; + /** + * Default host to serve on -this is <code>localhost</code> as it + * is the only one guaranteed to be available: {@value}. + */ + public static final String DEFAULT_ZKSERVICE_HOST = "localhost"; + /** + * port; 0 or below means "any": {@value} + */ + public static final String KEY_ZKSERVICE_PORT = ZKSERVICE_PREFIX + "port"; + + /** + * Directory containing data: {@value} + */ + public static final String KEY_ZKSERVICE_DIR = ZKSERVICE_PREFIX + "dir"; + + /** + * Should failed SASL clients be allowed: {@value}? + * + * Default is the ZK default: true + */ + public static final String KEY_ZKSERVICE_ALLOW_FAILED_SASL_CLIENTS = + ZKSERVICE_PREFIX + "allow.failed.sasl.clients"; +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java new file mode 100644 index 0000000..d60797e --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/RegistryAdminService.java @@ -0,0 +1,529 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry.server.services; + + +import com.google.common.annotations.VisibleForTesting; +import org.apache.commons.lang3.StringUtils; +import org.apache.curator.framework.api.BackgroundCallback; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathIsNotEmptyDirectoryException; +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.service.ServiceStateException; +import org.apache.hadoop.registry.client.binding.RegistryUtils; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.exceptions.InvalidRecordException; +import org.apache.hadoop.registry.client.exceptions.NoPathPermissionsException; +import org.apache.hadoop.registry.client.exceptions.NoRecordException; +import org.apache.hadoop.registry.client.impl.zk.RegistryBindingSource; +import org.apache.hadoop.registry.client.impl.zk.RegistryOperationsService; +import org.apache.hadoop.registry.client.impl.zk.RegistrySecurity; +import org.apache.hadoop.registry.client.types.RegistryPathStatus; +import org.apache.hadoop.registry.client.types.ServiceRecord; +import org.apache.hadoop.util.concurrent.HadoopExecutors; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.ZooDefs; +import org.apache.zookeeper.data.ACL; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InterruptedIOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Locale; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +/** + * Administrator service for the registry. This is the one with + * permissions to create the base directories and those for users. + * + * It also includes support for asynchronous operations, so that + * zookeeper connectivity problems do not hold up the server code + * performing the actions. + * + * Any action queued via {@link #submit(Callable)} will be + * run asynchronously. The {@link #createDirAsync(String, List, boolean)} + * is an example of such an an action + * + * A key async action is the depth-first tree purge, which supports + * pluggable policies for deleting entries. The method + * {@link #purge(String, NodeSelector, PurgePolicy, BackgroundCallback)} + * implements the recursive purge operation âthe class + * {{AsyncPurge}} provides the asynchronous scheduling of this. + */ +public class RegistryAdminService extends RegistryOperationsService { + + private static final Logger LOG = + LoggerFactory.getLogger(RegistryAdminService.class); + /** + * The ACL permissions for the user's homedir ACL. + */ + public static final int USER_HOMEDIR_ACL_PERMISSIONS = + ZooDefs.Perms.READ | ZooDefs.Perms.WRITE + | ZooDefs.Perms.CREATE | ZooDefs.Perms.DELETE; + + /** + * Executor for async operations + */ + protected final ExecutorService executor; + + /** + * Construct an instance of the service + * @param name service name + */ + public RegistryAdminService(String name) { + this(name, null); + } + + /** + * construct an instance of the service, using the + * specified binding source to bond to ZK + * @param name service name + * @param bindingSource provider of ZK binding information + */ + public RegistryAdminService(String name, + RegistryBindingSource bindingSource) { + super(name, bindingSource); + executor = HadoopExecutors.newCachedThreadPool( + new ThreadFactory() { + private AtomicInteger counter = new AtomicInteger(1); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, + "RegistryAdminService " + counter.getAndIncrement()); + } + }); + } + + /** + * Stop the service: halt the executor. + * @throws Exception exception. + */ + @Override + protected void serviceStop() throws Exception { + stopExecutor(); + super.serviceStop(); + } + + /** + * Stop the executor if it is not null. + * This uses {@link ExecutorService#shutdownNow()} + * and so does not block until they have completed. + */ + protected synchronized void stopExecutor() { + if (executor != null) { + executor.shutdownNow(); + } + } + + /** + * Get the executor + * @return the executor + */ + protected ExecutorService getExecutor() { + return executor; + } + + /** + * Submit a callable + * @param callable callable + * @param <V> type of the final get + * @return a future to wait on + */ + public <V> Future<V> submit(Callable<V> callable) { + if (LOG.isDebugEnabled()) { + LOG.debug("Submitting {}", callable); + } + return getExecutor().submit(callable); + } + + /** + * Asynchronous operation to create a directory + * @param path path + * @param acls ACL list + * @param createParents flag to indicate parent dirs should be created + * as needed + * @return the future which will indicate whether or not the operation + * succeeded âand propagate any exceptions + * @throws IOException + */ + public Future<Boolean> createDirAsync(final String path, + final List<ACL> acls, + final boolean createParents) throws IOException { + return submit(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + return maybeCreate(path, CreateMode.PERSISTENT, + acls, createParents); + } + }); + } + + /** + * Init operation sets up the system ACLs. + * @param conf configuration of the service + * @throws Exception + */ + @Override + protected void serviceInit(Configuration conf) throws Exception { + super.serviceInit(conf); + RegistrySecurity registrySecurity = getRegistrySecurity(); + if (registrySecurity.isSecureRegistry()) { + ACL sasl = registrySecurity.createSaslACLFromCurrentUser(ZooDefs.Perms.ALL); + registrySecurity.addSystemACL(sasl); + LOG.info("Registry System ACLs:", + RegistrySecurity.aclsToString( + registrySecurity.getSystemACLs())); + } + } + + /** + * Start the service, including creating base directories with permissions + * @throws Exception + */ + @Override + protected void serviceStart() throws Exception { + super.serviceStart(); + // create the root directories + try { + createRootRegistryPaths(); + } catch (NoPathPermissionsException e) { + + String message = String.format(Locale.ENGLISH, + "Failed to create root paths {%s};" + + "%ndiagnostics={%s}" + + "%ncurrent registry is:" + + "%n{%s}", + e, + bindingDiagnosticDetails(), + dumpRegistryRobustly(true)); + + LOG.error(" Failure {}", e, e); + LOG.error(message); + + // TODO: this is something temporary to deal with the problem + // that jenkins is failing this test + throw new NoPathPermissionsException(e.getPath().toString(), message, e); + } + } + + /** + * Create the initial registry paths + * @throws IOException any failure + */ + @VisibleForTesting + public void createRootRegistryPaths() throws IOException { + + List<ACL> systemACLs = getRegistrySecurity().getSystemACLs(); + LOG.info("System ACLs {}", + RegistrySecurity.aclsToString(systemACLs)); + maybeCreate("", CreateMode.PERSISTENT, systemACLs, false); + maybeCreate(PATH_USERS, CreateMode.PERSISTENT, + systemACLs, false); + maybeCreate(PATH_SYSTEM_SERVICES, + CreateMode.PERSISTENT, + systemACLs, false); + } + + /** + * Get the path to a user's home dir + * @param username username + * @return a path for services underneath + */ + protected String homeDir(String username) { + return RegistryUtils.homePathForUser(username); + } + + /** + * Set up the ACL for the user. + * <b>Important: this must run client-side as it needs + * to know the id:pass tuple for a user</b> + * @param username user name + * @param perms permissions + * @return an ACL list + * @throws IOException ACL creation/parsing problems + */ + public List<ACL> aclsForUser(String username, int perms) throws IOException { + List<ACL> clientACLs = getClientAcls(); + RegistrySecurity security = getRegistrySecurity(); + if (security.isSecureRegistry()) { + clientACLs.add(security.createACLfromUsername(username, perms)); + } + return clientACLs; + } + + /** + * Start an async operation to create the home path for a user + * if it does not exist + * @param shortname username, without any @REALM in kerberos + * @return the path created + * @throws IOException any failure while setting up the operation + * + */ + public Future<Boolean> initUserRegistryAsync(final String shortname) + throws IOException { + + String homeDir = homeDir(shortname); + if (!exists(homeDir)) { + // create the directory. The user does not + return createDirAsync(homeDir, + aclsForUser(shortname, + USER_HOMEDIR_ACL_PERMISSIONS), + false); + } + return null; + } + + /** + * Create the home path for a user if it does not exist. + * + * This uses {@link #initUserRegistryAsync(String)} and then waits for the + * result ... the code path is the same as the async operation; this just + * picks up and relays/converts exceptions + * @param username username + * @return the path created + * @throws IOException any failure + * + */ + public String initUserRegistry(final String username) + throws IOException { + + try { + Future<Boolean> future = initUserRegistryAsync(username); + future.get(); + } catch (InterruptedException e) { + throw (InterruptedIOException) + (new InterruptedIOException(e.toString()).initCause(e)); + } catch (ExecutionException e) { + Throwable cause = e.getCause(); + if (cause instanceof IOException) { + throw (IOException) (cause); + } else { + throw new IOException(cause.toString(), cause); + } + } + + return homeDir(username); + } + + /** + * Method to validate the validity of the kerberos realm. + * <ul> + * <li>Insecure: not needed.</li> + * <li>Secure: must have been determined.</li> + * </ul> + */ + protected void verifyRealmValidity() throws ServiceStateException { + if (isSecure()) { + String realm = getRegistrySecurity().getKerberosRealm(); + if (StringUtils.isEmpty(realm)) { + throw new ServiceStateException("Cannot determine service realm"); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Started Registry operations in realm {}", realm); + } + } + } + + /** + * Policy to purge entries + */ + public enum PurgePolicy { + PurgeAll, + FailOnChildren, + SkipOnChildren + } + + /** + * Recursive operation to purge all matching records under a base path. + * <ol> + * <li>Uses a depth first search</li> + * <li>A match is on ID and persistence policy, or, if policy==-1, any match</li> + * <li>If a record matches then it is deleted without any child searches</li> + * <li>Deletions will be asynchronous if a callback is provided</li> + * </ol> + * + * The code is designed to be robust against parallel deletions taking place; + * in such a case it will stop attempting that part of the tree. This + * avoid the situation of more than 1 purge happening in parallel and + * one of the purge operations deleteing the node tree above the other. + * @param path base path + * @param selector selector for the purge policy + * @param purgePolicy what to do if there is a matching record with children + * @param callback optional curator callback + * @return the number of delete operations perfomed. As deletes may be for + * everything under a path, this may be less than the number of records + * actually deleted + * @throws IOException problems + * @throws PathIsNotEmptyDirectoryException if an entry cannot be deleted + * as it has children and the purge policy is FailOnChildren + */ + @VisibleForTesting + public int purge(String path, + NodeSelector selector, + PurgePolicy purgePolicy, + BackgroundCallback callback) throws IOException { + + + boolean toDelete = false; + // look at self to see if it has a service record + Map<String, RegistryPathStatus> childEntries; + Collection<RegistryPathStatus> entries; + try { + // list this path's children + childEntries = RegistryUtils.statChildren(this, path); + entries = childEntries.values(); + } catch (PathNotFoundException e) { + // there's no record here, it may have been deleted already. + // exit + return 0; + } + + try { + RegistryPathStatus registryPathStatus = stat(path); + ServiceRecord serviceRecord = resolve(path); + // there is now an entry here. + toDelete = selector.shouldSelect(path, registryPathStatus, serviceRecord); + } catch (EOFException ignored) { + // ignore + } catch (InvalidRecordException ignored) { + // ignore + } catch (NoRecordException ignored) { + // ignore + } catch (PathNotFoundException e) { + // there's no record here, it may have been deleted already. + // exit + return 0; + } + + if (toDelete && !entries.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("Match on record @ {} with children ", path); + } + // there's children + switch (purgePolicy) { + case SkipOnChildren: + // don't do the deletion... continue to next record + if (LOG.isDebugEnabled()) { + LOG.debug("Skipping deletion"); + } + toDelete = false; + break; + case PurgeAll: + // mark for deletion + if (LOG.isDebugEnabled()) { + LOG.debug("Scheduling for deletion with children"); + } + toDelete = true; + entries = new ArrayList<RegistryPathStatus>(0); + break; + case FailOnChildren: + if (LOG.isDebugEnabled()) { + LOG.debug("Failing deletion operation"); + } + throw new PathIsNotEmptyDirectoryException(path); + } + } + + int deleteOps = 0; + if (toDelete) { + try { + zkDelete(path, true, callback); + } catch (PathNotFoundException e) { + // sign that the path was deleted during the operation. + // this is a no-op, and all children can be skipped + return deleteOps; + } + deleteOps++; + } + + // now go through the children + for (RegistryPathStatus status : entries) { + String childname = status.path; + String childpath = RegistryPathUtils.join(path, childname); + deleteOps += purge(childpath, + selector, + purgePolicy, + callback); + } + + return deleteOps; + } + + /** + * Comparator used for purge logic + */ + public interface NodeSelector { + + boolean shouldSelect(String path, + RegistryPathStatus registryPathStatus, + ServiceRecord serviceRecord); + } + + /** + * An async registry purge action taking + * a selector which decides what to delete + */ + public class AsyncPurge implements Callable<Integer> { + + private final BackgroundCallback callback; + private final NodeSelector selector; + private final String path; + private final PurgePolicy purgePolicy; + + public AsyncPurge(String path, + NodeSelector selector, + PurgePolicy purgePolicy, + BackgroundCallback callback) { + this.callback = callback; + this.selector = selector; + this.path = path; + this.purgePolicy = purgePolicy; + } + + @Override + public Integer call() throws Exception { + if (LOG.isDebugEnabled()) { + LOG.debug("Executing {}", this); + } + return purge(path, + selector, + purgePolicy, + callback); + } + + @Override + public String toString() { + return String.format( + "Record purge under %s with selector %s", + path, selector); + } + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java new file mode 100644 index 0000000..4bca195 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/java/org/apache/hadoop/registry/server/services/package-info.java @@ -0,0 +1,39 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +/** + * Basic services for the YARN registry + * <ul> + * <li> + * The {@link org.apache.hadoop.registry.server.services.RegistryAdminService} + * extends the shared YARN Registry client with registry setup and + * (potentially asynchronous) administrative actions. + * </li> + * <li> + * The {@link org.apache.hadoop.registry.server.services.MicroZookeeperService} + * is a transient Zookeeper instance bound to the YARN service lifecycle. + * It is suitable for testing. + * </li> + * <li> + * The {@link org.apache.hadoop.registry.server.services.AddingCompositeService} + * extends the standard YARN composite service by making its add and remove + * methods public. It is a utility service used in parts of the codebase + * </li> + * </ul> + */ +package org.apache.hadoop.registry.server.services; http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/main/tla/hadoopregistry.tla ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/main/tla/hadoopregistry.tla b/hadoop-common-project/hadoop-registry/src/main/tla/hadoopregistry.tla new file mode 100644 index 0000000..ace3cb2 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/main/tla/hadoopregistry.tla @@ -0,0 +1,582 @@ +---------------------------- MODULE hadoopregistry ---------------------------- + +EXTENDS FiniteSets, Sequences, Naturals, TLC + + +(* +============================================================================ + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +============================================================================ + *) + +(* + +============================================================================ + +This defines the Hadoop registry in terms of operations on sets of records. + +Every registry entry is represented as a record containing both the path and the data. + +It assumes that + +1. operations on this set are immediate. +2. selection operations (such as \A and \E are atomic) +3. changes are immediately visible to all other users of the registry. +4. This clearly implies that changes are visible in the sequence in which they happen. + +A multi-server Zookeeper-based registry may not meet all those assumptions + +1. changes may take time to propagate across the ZK quorum, hence changes cannot +be considered immediate from the perspective of other registry clients. +(assumptions (1) and (3)). + +2. Selection operations may not be atomic. (assumption (2)). + +Operations will still happen in the order received by the elected ZK master + +A stricter definition would try to state that all operations are eventually +true excluding other changes happening during a sequence of action. +This is left as an excercise for the reader. + +The specification also omits all coverage of the permissions policy. +*) + + + +CONSTANTS + PathChars, \* the set of valid characters in a path + Paths, \* the set of all possible valid paths + Data, \* the set of all possible sequences of bytes + Address, \* the set of all possible address n-tuples + Addresses, \* the set of all possible address instances + Endpoints , \* the set of all possible endpoints + PersistPolicies,\* the set of persistence policies + ServiceRecords, \* all service records + Registries, \* the set of all possile registries + BindActions, \* all possible put actions + DeleteActions, \* all possible delete actions + PurgeActions, \* all possible purge actions + MknodeActions \* all possible mkdir actions + + +ASSUME PathChars \in STRING +ASSUME Paths \in STRING + +(* Data in records is JSON, hence a string *) +ASSUME Data \in STRING + +---------------------------------------------------------------------------------------- + +(* the registry*) +VARIABLE registry + + +(* Sequence of actions to apply to the registry *) +VARIABLE actions + + +---------------------------------------------------------------------------------------- +(* Tuple of all variables. *) + + +vars == << registry, actions >> + + +---------------------------------------------------------------------------------------- + + + + +(* Persistence policy *) +PersistPolicySet == { + "permanent", \* persists until explicitly removed + "application", \* persists until the application finishes + "application-attempt", \* persists until the application attempt finishes + "container" \* persists until the container finishes + } + +(* Type invariants. *) +TypeInvariant == + /\ \A p \in PersistPolicies: p \in PersistPolicySet + + +---------------------------------------------------------------------------------------- + + + +(* + +An Entry is defined as a path, and the actual +data which it contains. + +By including the path in an entry, we avoid having to define some +function mapping Path -> entry. Instead a registry can be defined as a +set of RegistryEntries matching the validity critera. + +*) + +RegistryEntry == [ + \* The path to the entry + path: Paths, + + \* the data in the entry + data: Data + ] + + +(* Define the set of all string to string mappings *) + +StringMap == [ + STRING |-> STRING +] + + + +(* + An endpoint in a service record +*) +Endpoint == [ + \* API of the endpoint: some identifier + api: STRING, + + \* A list of address n-tuples + addresses: Addresses +] + +(* + A service record +*) +ServiceRecord == [ + + \* This MUST be present: if it is not then the data is not a service record + \* This permits shortcut scan & reject of byte arrays without parsing + type: "JSONServiceRecord", + + \*A description + description: STRING, + + \* A set of endpoints + external: Endpoints, + + \* Endpoints intended for use internally + internal: Endpoints, + + \* Attributes are a function + attributes: StringMap +] + +---------------------------------------------------------------------------------------- + +(* + There is an operation serialize whose internals are not defined, + Which converts the service records to JSON + *) + +CONSTANT serialize(_) + +(* A function which returns true iff the byte stream is considered a valid service record. *) +CONSTANT containsServiceRecord(_) + +(* A function to deserialize a string to JSON *) +CONSTANT deserialize(_) + +ASSUME \A json \in STRING: containsServiceRecord(json) \in BOOLEAN + +(* Records can be serialized *) +ASSUME \A r \in ServiceRecord : serialize(r) \in STRING /\ containsServiceRecord(serialize(r)) + +(* All strings for which containsServiceRecord() holds can be deserialized *) +ASSUME \A json \in STRING: containsServiceRecord(json) => deserialize(json) \in ServiceRecord + + + + +---------------------------------------------------------------------------------------- + +(* Action Records *) + +putAction == [ + type: "put", + record: ServiceRecord +] + +deleteAction == [ + type: "delete", + path: STRING, + recursive: BOOLEAN +] + +purgeAction == [ + type: "purge", + path: STRING, + persistence: PersistPolicySet +] + +mkNodeAction == [ + type: "mknode", + path: STRING, + parents: BOOLEAN +] + + +---------------------------------------------------------------------------------------- + +(* + + Path operations + +*) + +(* +Parent is defined for non empty sequences + *) + +parent(path) == SubSeq(path, 1, Len(path)-1) + +isParent(path, c) == path = parent(c) + +---------------------------------------------------------------------------------------- +(* +Registry Access Operations +*) + +(* +Lookup all entries in a registry with a matching path +*) + +resolve(Registry, path) == \A entry \in Registry: entry.path = path + +(* +A path exists in the registry iff there is an entry with that path +*) + +exists(Registry, path) == resolve(Registry, path) /= {} + +(* +A parent entry, or an empty set if there is none +*) +parentEntry(Registry, path) == resolve(Registry, parent(path)) + +(* +A root path is the empty sequence +*) +isRootPath(path) == path = <<>> + +(* +The root entry is the entry whose path is the root path +*) +isRootEntry(entry) == entry.path = <<>> + + +(* +A path p is an ancestor of another path d if they are different, and the path d +starts with path p +*) + +isAncestorOf(path, d) == + /\ path /= d + /\ \E k : SubSeq(d, 0, k) = path + + +ancestorPathOf(path) == + \A a \in Paths: isAncestorOf(a, path) + +(* +The set of all children of a path in the registry +*) + +children(R, path) == \A c \in R: isParent(path, c.path) + +(* +A path has children if the children() function does not return the empty set +*) +hasChildren(R, path) == children(R, path) /= {} + +(* +Descendant: a child of a path or a descendant of a child of a path +*) + +descendants(R, path) == \A e \in R: isAncestorOf(path, e.path) + +(* +Ancestors: all entries in the registry whose path is an entry of the path argument +*) +ancestors(R, path) == \A e \in R: isAncestorOf(e.path, path) + +(* +The set of entries that are a path and its descendants +*) +pathAndDescendants(R, path) == + \/ \A e \in R: isAncestorOf(path, e.path) + \/ resolve(R, path) + + +(* +For validity, all entries must match the following criteria + *) + +validRegistry(R) == + \* there can be at most one entry for a path. + /\ \A e \in R: Cardinality(resolve(R, e.path)) = 1 + + \* There's at least one root entry + /\ \E e \in R: isRootEntry(e) + + \* an entry must be the root entry or have a parent entry + /\ \A e \in R: isRootEntry(e) \/ exists(R, parent(e.path)) + + \* If the entry has data, it must contain a service record + /\ \A e \in R: (e.data = << >> \/ containsServiceRecord(e.data)) + + +---------------------------------------------------------------------------------------- +(* +Registry Manipulation +*) + +(* +An entry can be put into the registry iff +its parent is present or it is the root entry +*) +canBind(R, e) == + isRootEntry(e) \/ exists(R, parent(e.path)) + +(* +'bind() adds/replaces an entry if permitted +*) + +bind(R, e) == + /\ canBind(R, e) + /\ R' = (R \ resolve(R, e.path)) \union {e} + + +(* +mknode() adds a new empty entry where there was none before, iff +-the parent exists +-it meets the requirement for being "bindable" +*) + +mknodeSimple(R, path) == + LET entry == [ path |-> path, data |-> <<>> ] + IN \/ exists(R, path) + \/ (exists(R, parent(path)) /\ canBind(R, entry) /\ (R' = R \union {entry} )) + + +(* +For all parents, the mknodeSimple() criteria must apply. +This could be defined recursively, though as TLA+ does not support recursion, +an alternative is required + + +Because this specification is declaring the final state of a operation, not +the implemental, all that is needed is to describe those parents. + +It declares that the mknodeSimple() state applies to the path and all +its parents in the set R' + +*) +mknodeWithParents(R, path) == + /\ \A p2 \in ancestors(R, path) : mknodeSimple(R, p2) + /\ mknodeSimple(R, path) + + +mknode(R, path, recursive) == + IF recursive THEN mknodeWithParents(R, path) ELSE mknodeSimple(R, path) + +(* +Deletion is set difference on any existing entries +*) + +simpleDelete(R, path) == + /\ ~isRootPath(path) + /\ children(R, path) = {} + /\ R' = R \ resolve(R, path) + +(* +Recursive delete: neither the path or its descendants exists in the new registry +*) + +recursiveDelete(R, path) == + \* Root path: the new registry is the initial registry again + /\ isRootPath(path) => R' = { [ path |-> <<>>, data |-> <<>> ] } + \* Any other entry: the new registry is a set with any existing + \* entry for that path is removed, and the new entry added + /\ ~isRootPath(path) => R' = R \ ( resolve(R, path) \union descendants(R, path)) + + +(* +Delete operation which chooses the recursiveness policy based on an argument +*) + +delete(R, path, recursive) == + IF recursive THEN recursiveDelete(R, path) ELSE simpleDelete(R, path) + + +(* +Purge ensures that all entries under a path with the matching ID and policy are not there +afterwards +*) + +purge(R, path, id, persistence) == + /\ (persistence \in PersistPolicySet) + /\ \A p2 \in pathAndDescendants(R, path) : + (p2.attributes["yarn:id"] = id /\ p2.attributes["yarn:persistence"] = persistence) + => recursiveDelete(R, p2.path) + +(* +resolveEntry() resolves the record entry at a path or fails. + +It relies on the fact that if the cardinality of a set is 1, then the CHOOSE operator +is guaranteed to return the single entry of that set, iff the choice predicate holds. + +Using a predicate of TRUE, it always succeeds, so this function selects +the sole entry of the resolve operation. +*) + +resolveEntry(R, path) == + LET l == resolve(R, path) IN + /\ Cardinality(l) = 1 + /\ CHOOSE e \in l : TRUE + +(* + Resolve a record by resolving the entry and deserializing the result + *) +resolveRecord(R, path) == + deserialize(resolveEntry(R, path)) + + +(* +The specific action of putting an entry into a record includes validating the record +*) + +validRecordToBind(path, record) == + \* The root entry must have permanent persistence + isRootPath(path) => ( + record.attributes["yarn:persistence"] = "permanent" + \/ record.attributes["yarn:persistence"] + \/ record.attributes["yarn:persistence"] = {}) + + +(* +Binding a service record involves validating it then putting it in the registry +marshalled as the data in the entry + *) +bindRecord(R, path, record) == + /\ validRecordToBind(path, record) + /\ bind(R, [path |-> path, data |-> serialize(record)]) + + +---------------------------------------------------------------------------------------- + + +(* +The action queue can only contain one of the sets of action types, and +by giving each a unique name, those sets are guaranteed to be disjoint +*) + QueueInvariant == + /\ \A a \in actions: + \/ (a \in BindActions /\ a.type="bind") + \/ (a \in DeleteActions /\ a.type="delete") + \/ (a \in PurgeActions /\ a.type="purge") + \/ (a \in MknodeActions /\ a.type="mknode") + + +(* +Applying queued actions +*) + +applyAction(R, a) == + \/ (a \in BindActions /\ bindRecord(R, a.path, a.record) ) + \/ (a \in MknodeActions /\ mknode(R, a.path, a.recursive) ) + \/ (a \in DeleteActions /\ delete(R, a.path, a.recursive) ) + \/ (a \in PurgeActions /\ purge(R, a.path, a.id, a.persistence)) + + +(* +Apply the first action in a list and then update the actions +*) +applyFirstAction(R, a) == + /\ actions /= <<>> + /\ applyAction(R, Head(a)) + /\ actions' = Tail(a) + + +Next == applyFirstAction(registry, actions) + +(* +All submitted actions must eventually be applied. +*) + + +Liveness == <>( actions = <<>> ) + + +(* +The initial state of a registry has the root entry. +*) + +InitialRegistry == registry = { + [ path |-> <<>>, data |-> <<>> ] +} + + +(* +The valid state of the "registry" variable is defined as +Via the validRegistry predicate +*) + +ValidRegistryState == validRegistry(registry) + + + +(* +The initial state of the system +*) +InitialState == + /\ InitialRegistry + /\ ValidRegistryState + /\ actions = <<>> + + +(* +The registry has an initial state, the series of state changes driven by the actions, +and the requirement that it does act on those actions. +*) +RegistrySpec == + /\ InitialState + /\ [][Next]_vars + /\ Liveness + + +---------------------------------------------------------------------------------------- + +(* +Theorem: For all operations from that initial state, the registry state is still valid +*) +THEOREM InitialState => [] ValidRegistryState + +(* +Theorem: for all operations from that initial state, the type invariants hold +*) +THEOREM InitialState => [] TypeInvariant + +(* +Theorem: the queue invariants hold +*) +THEOREM InitialState => [] QueueInvariant + +============================================================================= http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java b/hadoop-common-project/hadoop-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java new file mode 100644 index 0000000..0d4a467 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/test/java/org/apache/hadoop/registry/AbstractRegistryTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry; + +import org.apache.hadoop.fs.PathNotFoundException; +import org.apache.hadoop.registry.client.api.RegistryOperations; +import org.apache.hadoop.registry.client.binding.RegistryPathUtils; +import org.apache.hadoop.registry.client.types.yarn.PersistencePolicies; +import org.apache.hadoop.registry.client.types.ServiceRecord; + +import org.apache.hadoop.registry.server.services.RegistryAdminService; +import org.junit.Before; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.URISyntaxException; + + +public class AbstractRegistryTest extends AbstractZKRegistryTest { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractRegistryTest.class); + protected RegistryAdminService registry; + protected RegistryOperations operations; + + @Before + public void setupRegistry() throws IOException { + registry = new RegistryAdminService("yarnRegistry"); + operations = registry; + registry.init(createRegistryConfiguration()); + registry.start(); + operations.delete("/", true); + registry.createRootRegistryPaths(); + addToTeardown(registry); + } + + /** + * Create a service entry with the sample endpoints, and put it + * at the destination + * @param path path + * @param createFlags flags + * @return the record + * @throws IOException on a failure + */ + protected ServiceRecord putExampleServiceEntry(String path, int createFlags) throws + IOException, + URISyntaxException { + return putExampleServiceEntry(path, createFlags, PersistencePolicies.PERMANENT); + } + + /** + * Create a service entry with the sample endpoints, and put it + * at the destination + * @param path path + * @param createFlags flags + * @return the record + * @throws IOException on a failure + */ + protected ServiceRecord putExampleServiceEntry(String path, + int createFlags, + String persistence) + throws IOException, URISyntaxException { + ServiceRecord record = buildExampleServiceEntry(persistence); + + registry.mknode(RegistryPathUtils.parentOf(path), true); + operations.bind(path, record, createFlags); + return record; + } + + /** + * Assert a path exists + * @param path path in the registry + * @throws IOException + */ + public void assertPathExists(String path) throws IOException { + operations.stat(path); + } + + /** + * assert that a path does not exist + * @param path path in the registry + * @throws IOException + */ + public void assertPathNotFound(String path) throws IOException { + try { + operations.stat(path); + fail("Path unexpectedly found: " + path); + } catch (PathNotFoundException e) { + + } + } + + /** + * Assert that a path resolves to a service record + * @param path path in the registry + * @throws IOException + */ + public void assertResolves(String path) throws IOException { + operations.resolve(path); + } + +} http://git-wip-us.apache.org/repos/asf/hadoop/blob/e2a9fa84/hadoop-common-project/hadoop-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java ---------------------------------------------------------------------- diff --git a/hadoop-common-project/hadoop-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java b/hadoop-common-project/hadoop-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java new file mode 100644 index 0000000..3f87a39 --- /dev/null +++ b/hadoop-common-project/hadoop-registry/src/test/java/org/apache/hadoop/registry/AbstractZKRegistryTest.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.registry; + +import org.apache.commons.io.FileUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.service.Service; +import org.apache.hadoop.registry.client.api.RegistryConstants; +import org.apache.hadoop.registry.conf.RegistryConfiguration; +import org.apache.hadoop.registry.server.services.AddingCompositeService; +import org.apache.hadoop.registry.server.services.MicroZookeeperService; +import org.apache.hadoop.registry.server.services.MicroZookeeperServiceKeys; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Rule; +import org.junit.rules.TestName; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; + +public class AbstractZKRegistryTest extends RegistryTestHelper { + private static final Logger LOG = + LoggerFactory.getLogger(AbstractZKRegistryTest.class); + + private static final AddingCompositeService servicesToTeardown = + new AddingCompositeService("teardown"); + // static initializer guarantees it is always started + // ahead of any @BeforeClass methods + static { + servicesToTeardown.init(new Configuration()); + servicesToTeardown.start(); + } + + @Rule + public final Timeout testTimeout = new Timeout(10000); + + @Rule + public TestName methodName = new TestName(); + + protected static void addToTeardown(Service svc) { + servicesToTeardown.addService(svc); + } + + @AfterClass + public static void teardownServices() throws IOException { + describe(LOG, "teardown of static services"); + servicesToTeardown.close(); + } + + protected static MicroZookeeperService zookeeper; + + + @BeforeClass + public static void createZKServer() throws Exception { + File zkDir = new File("target/zookeeper"); + FileUtils.deleteDirectory(zkDir); + assertTrue(zkDir.mkdirs()); + zookeeper = new MicroZookeeperService("InMemoryZKService"); + Configuration conf = new RegistryConfiguration(); + conf.set(MicroZookeeperServiceKeys.KEY_ZKSERVICE_DIR, zkDir.getAbsolutePath()); + zookeeper.init(conf); + zookeeper.start(); + addToTeardown(zookeeper); + } + + /** + * give our thread a name + */ + @Before + public void nameThread() { + Thread.currentThread().setName("JUnit"); + } + + /** + * Returns the connection string to use + * + * @return connection string + */ + public String getConnectString() { + return zookeeper.getConnectionString(); + } + + public Configuration createRegistryConfiguration() { + Configuration conf = new RegistryConfiguration(); + conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_CONNECTION_TIMEOUT, 1000); + conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_INTERVAL, 500); + conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_TIMES, 10); + conf.setInt(RegistryConstants.KEY_REGISTRY_ZK_RETRY_CEILING, 10); + conf.set(RegistryConstants.KEY_REGISTRY_ZK_QUORUM, + zookeeper.getConnectionString()); + return conf; + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
