Author: liyin Date: Thu Apr 17 00:49:17 2014 New Revision: 1588118 URL: http://svn.apache.org/r1588118 Log: [master] Support arbitrary parameters and return value in endpoint.
Author: daviddeng Summary: Implements `EndpointBytesCodec` converting between primitive types and `byte arrays. Test Plan: `TestEndpoint` was changed. `TestEndpointBytesCodec` was added. Reviewers: adela, gauravm, manukranthk Reviewed By: adela CC: hbase-eng@, andrewcox Differential Revision: https://phabricator.fb.com/D1265851 Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java - copied, changed from r1588117, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/client/HTable.java Thu Apr 17 00:49:17 2014 @@ -1528,8 +1528,8 @@ getRegionCachePrefetch(new StringBytes(t } @Override - public <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint( - Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T> caller) + public <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint( + Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller) throws IOException { return this.endpointClient.coprocessorEndpoint(clazz, startRow, stopRow, caller); Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java?rev=1588118&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointBytesCodec.java Thu Apr 17 00:49:17 2014 @@ -0,0 +1,258 @@ +/** + * Copyright 2014 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.endpoints; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Bytes; + +/** + * Codec for endpoint that convert between types and byte arrays. + * + * TODO add testcase for this class. + */ +public class EndpointBytesCodec { + + /** + * The interface of decoder. + */ + public interface IBytesDecoder { + /** + * Decodes a byte array into an object. + */ + Object decode(byte[] bytes); + } + + /** + * The interface of encoder. + */ + public interface IBytesEncoder { + /** + * Encode an object into bytes. + */ + public byte[] encode(Object obj); + } + + /** + * Mapping from class types to IBytesDecoders. + */ + private static Map<Class<?>, IBytesDecoder> decoders = new HashMap<>(); + + /** + * Mapping from class types to IBytesEncoders. + */ + private static Map<Class<?>, IBytesEncoder> encoders = new HashMap<>(); + + /** + * Connects an encoder and a decoder with some classes. + * + * @param classes All classes in this array will be connected to the codecs. + */ + private static void addCodec(Class<?>[] classes, IBytesEncoder enc, + IBytesDecoder dec) { + for (Class<?> cls : classes) { + encoders.put(cls, enc); + decoders.put(cls, dec); + } + } + + private static final byte[] BYTES_FALSE = { 0 }; + private static final byte[] BYTES_TRUE = { 1 }; + + static { + addCodec(new Class<?>[] { byte[].class }, new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return (byte[]) obj; + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + if (param == null) { + return HConstants.EMPTY_BYTE_ARRAY; + } + return param; + } + }); + addCodec(new Class<?>[] { String.class }, new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return Bytes.toBytes((String) obj); + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return Bytes.toString(param); + } + }); + addCodec(new Class<?>[] { Boolean.class, boolean.class }, + new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return ((Boolean) obj) ? BYTES_TRUE : BYTES_FALSE; + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return param[0] != 0; + } + }); + addCodec(new Class<?>[] { Byte.class, byte.class }, + new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return new byte[]{(Byte) obj}; + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return param[0]; + } + }); + addCodec(new Class<?>[] { Character.class, char.class }, + new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return Bytes.toBytes((short) ((Character) obj).charValue()); + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return (char) Bytes.toShort(param); + } + }); + addCodec(new Class<?>[] { Short.class, short.class }, + new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return Bytes.toBytes((Short) obj); + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return Bytes.toShort(param); + } + }); + addCodec(new Class<?>[] { Integer.class, int.class }, + new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return Bytes.toBytes((Integer) obj); + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return Bytes.toInt(param); + } + }); + addCodec(new Class<?>[] { Long.class, long.class }, + new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return Bytes.toBytes((Long) obj); + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return Bytes.toLong(param); + } + }); + addCodec(new Class<?>[] { Float.class, float.class }, + new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return Bytes.toBytes((Float) obj); + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return Bytes.toFloat(param); + } + }); + addCodec(new Class<?>[] { Double.class, double.class }, + new IBytesEncoder() { + @Override + public byte[] encode(Object obj) { + return Bytes.toBytes((Double) obj); + } + }, new IBytesDecoder() { + @Override + public Object decode(byte[] param) { + return Bytes.toDouble(param); + } + }); + } + + /** + * @returns an IBytesDecoder for a specified type. null is returned if not + * supported. + */ + public static IBytesDecoder findDecoder(Class<?> type) { + // TODO daviddeng support array of supported types. + return decoders.get(type); + } + + /** + * @return an IBytesEncoder for a specified type. null is returned if not + * supported + */ + public static IBytesEncoder findEncoder(Class<?> type) { + // TODO daviddeng support array of supported types. + return encoders.get(type); + } + + /** + * Decodes a byte array into an object with specified type. + */ + public static Object decode(Class<?> type, byte[] bytes) { + return decoders.get(type).decode(bytes); + } + + /** + * Encodes an Object into a byte array. + */ + public static byte[] encodeObject(Object obj) { + if (obj == null) { + // We don't distinguish null and zero-length byte array + return HConstants.EMPTY_BYTE_ARRAY; + } + + IBytesEncoder enc = findEncoder(obj.getClass()); + if (enc == null) { + new UnsupportedTypeException(obj.getClass()); + } + return enc.encode(obj); + } + + /** + * Encodes an array of Objects into an ArrayList of byte arrays. + */ + public static ArrayList<byte[]> encodeArray(Object[] args) { + ArrayList<byte[]> res = new ArrayList<>(args.length); + for (int i = 0; i < args.length; i++) { + res.add(encodeObject(args[i])); + } + return res; + } + +} Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java Thu Apr 17 00:49:17 2014 @@ -19,8 +19,18 @@ */ package org.apache.hadoop.hbase.coprocessor.endpoints; +import java.io.IOException; +import java.lang.reflect.InvocationTargetException; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hbase.DoNotRetryIOException; +import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointBytesCodec.IBytesDecoder; + /** * The manager holding all endpoint factories in the server. * @@ -29,27 +39,127 @@ public class EndpointManager { private static EndpointManager instance = new EndpointManager(); /** + * Make constructor private for singleton mode. + */ + private EndpointManager() { + } + + /** * Returns the singleton endpoint-manager */ public static EndpointManager get() { return instance; } - private ConcurrentHashMap<String, IEndpointFactory<?>> nameFacts = new ConcurrentHashMap<>(); + /** + * Data-structure storing information of an Endpoint. + */ + public static class EndpointInfo { + private IEndpointFactory<?> factory; + private Map<String, Method> methods = new HashMap<>(); + private Map<String, IBytesDecoder[]> mthToDecs = new ConcurrentHashMap<>(); + + private Object[] encodeParams(String methodKey, ArrayList<byte[]> params) { + IBytesDecoder[] decoders = mthToDecs.get(methodKey); + Object[] res = new Object[params.size()]; + for (int i = 0; i < res.length; i++) { + res[i] = decoders[i].decode(params.get(i)); + } + return res; + } + + private static String makeMethodKey(String methodName, int nParams) { + return methodName + "/" + nParams; + } + + private static final HashSet<String> IEndpointMethodNames = new HashSet<>(); + static { + for (Method method : IEndpoint.class.getMethods()) { + IEndpointMethodNames.add(method.getName()); + } + } + + /** + * Constructor. + * + * @param iEndpoint the class of the IEndpont instance. + * @param factory the factory generating IEndpoint instances. + */ + public EndpointInfo(Class<?> iEndpoint, IEndpointFactory<?> factory) { + this.factory = factory; + + for (Method method : iEndpoint.getMethods()) { + if (IEndpointMethodNames.contains(method.getName())) { + // Ignore methods in IEndpoint + continue; + } + + Class<?>[] paramsCls = method.getParameterTypes(); + + String key = makeMethodKey(method.getName(), paramsCls.length); + this.methods.put(key, method); + + EndpointBytesCodec.IBytesDecoder[] decs = + new EndpointBytesCodec.IBytesDecoder[paramsCls.length]; + for (int i = 0; i < paramsCls.length; i++) { + IBytesDecoder dec = EndpointBytesCodec.findDecoder(paramsCls[i]); + if (dec == null) { + throw new UnsupportedTypeException(paramsCls[i]); + } + decs[i] = dec; + } + this.mthToDecs.put(key, decs); + } + } + + /** + * Calls factory to create a new instance of IEndpoint. + */ + public IEndpoint createEndpoint() { + return factory.create(); + } + + /** + * Invokes a method in the Endpoint. + * + * @param ep the IEndpoint instance. + * @param methodName the name of the methods. + * @param params the encoded parameters. + * @return the encoded return results. + */ + public byte[] invoke(IEndpoint ep, String methodName, ArrayList<byte[]> params) + throws IllegalAccessException, IllegalArgumentException, + InvocationTargetException, IOException { + String methodKey = EndpointInfo.makeMethodKey(methodName, params.size()); + Method mth = methods.get(methodKey); + if (mth == null) { + // TODO daviddeng make a special exception for this + throw new DoNotRetryIOException("epName." + methodKey + + " does not exists"); + } + return EndpointBytesCodec.encodeObject(mth.invoke(ep, + encodeParams(methodKey, params))); + } + } + + private ConcurrentHashMap<String, EndpointInfo> nameFacts = + new ConcurrentHashMap<>(); /** * Returns the factory of an endpoint. */ - public IEndpointFactory<?> getFactory(String name) { + public EndpointInfo getEndpointEntry(String name) { return nameFacts.get(name); - } /** - * Register an endpoint with its factory + * Registers an endpoint with its factory + * + * @throws UnsupportedTypeException if one of the parameter's type is not + * supported. */ public <T extends IEndpoint> void register(Class<T> iEndpoint, IEndpointFactory<T> factory) { - nameFacts.put(iEndpoint.getName(), factory); + nameFacts.put(iEndpoint.getName(), new EndpointInfo(iEndpoint, factory)); } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointServer.java Thu Apr 17 00:49:17 2014 @@ -19,18 +19,19 @@ */ package org.apache.hadoop.hbase.coprocessor.endpoints; -import java.lang.reflect.Method; +import java.util.ArrayList; import org.apache.hadoop.hbase.DoNotRetryIOException; import org.apache.hadoop.hbase.NotServingRegionException; +import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointManager.EndpointInfo; import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.regionserver.HRegionServer; /** - * A endpoint server. + * An endpoint server. */ -public class EndpointServer { +public class EndpointServer implements IEndpointServer { private HRegionServer server; @@ -38,35 +39,22 @@ public class EndpointServer { this.server = server; } - /** - * Calls an endpoint on an region server. - * - * TODO make regionName a list. - * - * @param epName - * the endpoint name. - * @param methodName - * the method name. - * @param regionName - * the name of the region - * @param startRow - * the start row, inclusive - * @param stopRow - * the stop row, exclusive - * @return the computed value. - */ + @Override public byte[] callEndpoint(String epName, String methodName, - final byte[] regionName, final byte[] startRow, final byte[] stopRow) - throws ThriftHBaseException { + ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow, + final byte[] stopRow) throws ThriftHBaseException { try { - IEndpointFactory<?> fact = EndpointManager.get().getFactory(epName); - if (fact == null) { + EndpointInfo ent = EndpointManager.get().getEndpointEntry(epName); + if (ent == null) { // TODO daviddeng make a special exception for this throw new DoNotRetryIOException("Endpoint " + epName + " does not exists"); } - IEndpoint ep = fact.create(); + // Create an IEndpoint instance. + IEndpoint ep = ent.createEndpoint(); + + // Set the context. ep.setContext(new IEndpointContext() { @Override public HRegion getRegion() throws NotServingRegionException { @@ -84,12 +72,10 @@ public class EndpointServer { } }); - // TODO daviddeng: now we only support methods without any parameters. - Method mth = ep.getClass().getMethod(methodName); - return (byte[]) mth.invoke(ep); + // Invoke the specified method with parameters, the return value is + // encoded and returned. + return ent.invoke(ep, methodName, params); } catch (Exception e) { - // TODO daviddeng if the method is not found, should throw - // DoNotRetryIOException throw new ThriftHBaseException(e); } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/HTableEndpointClient.java Thu Apr 17 00:49:17 2014 @@ -23,9 +23,9 @@ import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; +import java.util.HashMap; import java.util.Map; import java.util.NavigableMap; -import java.util.TreeMap; import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HRegionInfo; @@ -33,7 +33,6 @@ import org.apache.hadoop.hbase.HServerAd import org.apache.hadoop.hbase.client.HConnection; import org.apache.hadoop.hbase.client.HTable; import org.apache.hadoop.hbase.client.ServerCallable; -import org.apache.hadoop.hbase.util.Bytes; /** * A IEndpointClient served as part of an HTable. @@ -63,17 +62,20 @@ public class HTableEndpointClient implem InvocationHandler handler = new InvocationHandler() { @Override - public Object invoke(Object proxy, final Method method, Object[] args) - throws Throwable { + public Object invoke(Object proxy, final Method method, + final Object[] args) throws Throwable { HConnection conn = table.getConnectionAndResetOperationContext(); - return conn.getRegionServerWithRetries(new ServerCallable<byte[]>( + return conn.getRegionServerWithRetries(new ServerCallable<Object>( table.getConnection(), table.getTableNameStringBytes(), region.getStartKey(), table.getOptions()) { @Override - public byte[] call() throws IOException { - // TODO support arguments - return server.callEndpoint(clazz.getName(), method.getName(), - region.getRegionName(), startRow, stopRow); + public Object call() throws IOException { + byte[] res = server.callEndpoint(clazz.getName(),method.getName(), + EndpointBytesCodec.encodeArray(args), region.getRegionName(), + startRow, stopRow); + + return EndpointBytesCodec.decode(method.getReturnType(), + res); } }); } @@ -84,10 +86,10 @@ public class HTableEndpointClient implem } @Override - public <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint( - Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T> caller) + public <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint( + Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller) throws IOException { - Map<byte[], byte[]> results = new TreeMap<>(Bytes.BYTES_COMPARATOR); + Map<HRegionInfo, R> results = new HashMap<>(); NavigableMap<HRegionInfo, HServerAddress> regions = table.getRegionsInfo(); @@ -95,7 +97,7 @@ public class HTableEndpointClient implem // TODO compute startRow and stopRow T ep = getEndpointProxy(clazz, region, HConstants.EMPTY_BYTE_ARRAY, HConstants.EMPTY_BYTE_ARRAY); - results.put(region.getRegionName(), caller.call(ep)); + results.put(region, caller.call(ep)); } return results; Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointClient.java Thu Apr 17 00:49:17 2014 @@ -22,6 +22,8 @@ package org.apache.hadoop.hbase.coproces import java.io.IOException; import java.util.Map; +import org.apache.hadoop.hbase.HRegionInfo; + /** * The interface of a client for calling a endpoint. */ @@ -30,10 +32,10 @@ public interface IEndpointClient { /** * The interface of a caller for <code>coprocessorEndpoint</code> * - * @param <T> - * The type of the endpoint interface. (NOT the implementation) + * @param <T> the type of the endpoint interface. (NOT the implementation) + * @param <R> the type of the return value. */ - public interface Caller<T extends IEndpoint> { + public interface Caller<T extends IEndpoint, R> { /** * Calls an endpoint. @@ -42,7 +44,7 @@ public interface IEndpointClient { * an RPC client. * @return the result to be put as a value in coprocessorEndpoint's results */ - byte[] call(T client) throws IOException; + R call(T client) throws IOException; } /** @@ -61,6 +63,7 @@ public interface IEndpointClient { * the caller for each region * @return a map from region name to results. */ - <T extends IEndpoint> Map<byte[], byte[]> coprocessorEndpoint(Class<T> clazz, - byte[] startRow, byte[] stopRow, Caller<T> caller) throws IOException; + <T extends IEndpoint, R> Map<HRegionInfo, R> coprocessorEndpoint( + Class<T> clazz, byte[] startRow, byte[] stopRow, Caller<T, R> caller) + throws IOException; } Added: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java?rev=1588118&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java (added) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/IEndpointServer.java Thu Apr 17 00:49:17 2014 @@ -0,0 +1,55 @@ +/** + * Copyright 2014 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.endpoints; + +import java.util.ArrayList; + +import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException; + +import com.facebook.swift.codec.ThriftField; +import com.facebook.swift.service.ThriftException; +import com.facebook.swift.service.ThriftMethod; + +/** + * The interface of a server executing endpoints. + */ +public interface IEndpointServer { + /** + * Calls an endpoint on an region server. + * + * TODO make regionName/startRow/stopRow a list. + * + * @param epName the endpoint name. + * @param methodName the method name. + * @param regionName the name of the region + * @param startRow the start row, inclusive + * @param stopRow the stop row, exclusive + * @return the computed value. + */ + @ThriftMethod(value = "callEndpoint", exception = { + @ThriftException(type = ThriftHBaseException.class, id = 1) }) + public byte[] callEndpoint(@ThriftField(name = "epName") String epName, + @ThriftField(name = "methodName") String methodName, + @ThriftField(name = "params") ArrayList<byte[]> params, + @ThriftField(name = "regionName") byte[] regionName, + @ThriftField(name = "startRow") byte[] startRow, + @ThriftField(name = "stopRow") byte[] stopRow) + throws ThriftHBaseException; +} Copied: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java (from r1588117, hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java) URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java?p2=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java&p1=hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java&r1=1588117&r2=1588118&rev=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/EndpointManager.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/coprocessor/endpoints/UnsupportedTypeException.java Thu Apr 17 00:49:17 2014 @@ -19,37 +19,19 @@ */ package org.apache.hadoop.hbase.coprocessor.endpoints; -import java.util.concurrent.ConcurrentHashMap; - /** - * The manager holding all endpoint factories in the server. - * + * An exception thrown when the type of a parameter or return value of a method + * is not supported. */ -public class EndpointManager { - private static EndpointManager instance = new EndpointManager(); - - /** - * Returns the singleton endpoint-manager - */ - public static EndpointManager get() { - return instance; - } - - private ConcurrentHashMap<String, IEndpointFactory<?>> nameFacts = new ConcurrentHashMap<>(); - - /** - * Returns the factory of an endpoint. - */ - public IEndpointFactory<?> getFactory(String name) { - return nameFacts.get(name); - - } +public class UnsupportedTypeException extends RuntimeException { + private static final long serialVersionUID = 1L; /** - * Register an endpoint with its factory + * Constructor. + * + * @param cls the Class of the unsupported type. */ - public <T extends IEndpoint> void register(Class<T> iEndpoint, - IEndpointFactory<T> factory) { - nameFacts.put(iEndpoint.getName(), factory); + public UnsupportedTypeException(Class<?> cls) { + super(cls + " is not supported by endpoint codec."); } } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/HRegionInterface.java Thu Apr 17 00:49:17 2014 @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.ipc; import java.io.IOException; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.hbase.HRegionInfo; @@ -65,7 +66,8 @@ public interface HRegionInterface extend * @return the computed value. */ public byte[] callEndpoint(String epName, String methodName, - byte[] regionName, byte[] startRow, byte[] stopRow) throws IOException; + ArrayList<byte[]> params, byte[] regionName, byte[] startRow, + byte[] stopRow) throws IOException; /** * Get metainfo about an HRegion Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/ThriftHRegionInterface.java Thu Apr 17 00:49:17 2014 @@ -37,6 +37,7 @@ import org.apache.hadoop.hbase.client.Ro import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.TMultiResponse; import org.apache.hadoop.hbase.client.TRowMutations; +import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointServer; import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket; import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException; import org.apache.hadoop.hbase.master.AssignmentPlan; @@ -56,28 +57,8 @@ import com.google.common.util.concurrent * */ @ThriftService -public interface ThriftHRegionInterface extends ThriftClientInterface { - - /** - * Calls an endpoint on an region server. - * - * TODO make regionName/startRow/stopRow a list. - * - * @param epName the endpoint name. - * @param methodName the method name. - * @param regionName the name of the region - * @param startRow the start row, inclusive - * @param stopRow the stop row, exclusive - * @return the computed value. - */ - @ThriftMethod(value = "callEndpoint", exception = { - @ThriftException(type = ThriftHBaseException.class, id = 1) }) - public byte[] callEndpoint(@ThriftField(name = "epName") String epName, - @ThriftField(name = "methodName") String methodName, - @ThriftField(name = "regionName") byte[] regionName, - @ThriftField(name = "startRow") byte[] startRow, - @ThriftField(name = "stopRow") byte[] stopRow) - throws ThriftHBaseException; +public interface ThriftHRegionInterface extends ThriftClientInterface, + IEndpointServer { /** * Get metainfo about an HRegion Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/ipc/thrift/HBaseToThriftAdapter.java Thu Apr 17 00:49:17 2014 @@ -1189,11 +1189,12 @@ public class HBaseToThriftAdapter implem @Override public byte[] callEndpoint(String epName, String methodName, - byte[] regionName, byte[] startRow, byte[] stopRow) throws IOException { + ArrayList<byte[]> params, byte[] regionName, byte[] startRow, + byte[] stopRow) throws IOException { preProcess(); try { - return connection.callEndpoint(epName, methodName, regionName, startRow, - stopRow); + return connection.callEndpoint(epName, methodName, params, regionName, + startRow, stopRow); } catch (ThriftHBaseException te) { Exception e = te.getServerJavaException(); handleIOException(e); Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/HRegionServer.java Thu Apr 17 00:49:17 2014 @@ -4100,8 +4100,8 @@ public class HRegionServer implements HR @Override public byte[] callEndpoint(String epName, String methodName, - final byte[] regionName, final byte[] startRow, final byte[] stopRow) - throws IOException { + ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow, + final byte[] stopRow) throws IOException { throw new NotImplementedException("HRegionserver.callEndpoint"); } Modified: hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java (original) +++ hbase/branches/0.89-fb/src/main/java/org/apache/hadoop/hbase/regionserver/ThriftHRegionServer.java Thu Apr 17 00:49:17 2014 @@ -49,6 +49,7 @@ import org.apache.hadoop.hbase.client.Sc import org.apache.hadoop.hbase.client.TMultiResponse; import org.apache.hadoop.hbase.client.TRowMutations; import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointServer; +import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointServer; import org.apache.hadoop.hbase.io.hfile.histogram.HFileHistogram.Bucket; import org.apache.hadoop.hbase.ipc.ThriftHRegionInterface; import org.apache.hadoop.hbase.ipc.thrift.exceptions.ThriftHBaseException; @@ -60,8 +61,6 @@ import org.apache.hadoop.io.LongWritable import org.apache.hadoop.io.MapWritable; import org.apache.hadoop.io.Writable; -import com.facebook.swift.service.ThriftException; -import com.facebook.swift.service.ThriftMethod; import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; @@ -70,10 +69,10 @@ import com.google.common.util.concurrent * */ public class ThriftHRegionServer implements ThriftHRegionInterface { - public static Log LOG = LogFactory.getLog(ThriftHRegionServer.class); + private static Log LOG = LogFactory.getLog(ThriftHRegionServer.class); private HRegionServer server; - private EndpointServer endpointServer; + private IEndpointServer endpointServer; public ThriftHRegionServer(HRegionServer server) { this.server = server; @@ -639,9 +638,9 @@ public class ThriftHRegionServer impleme @Override public byte[] callEndpoint(String epName, String methodName, - final byte[] regionName, final byte[] startRow, final byte[] stopRow) - throws ThriftHBaseException { - return endpointServer.callEndpoint(epName, methodName, regionName, + ArrayList<byte[]> params, final byte[] regionName, final byte[] startRow, + final byte[] stopRow) throws ThriftHBaseException { + return endpointServer.callEndpoint(epName, methodName, params, regionName, startRow, stopRow); } Modified: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java?rev=1588118&r1=1588117&r2=1588118&view=diff ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java (original) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpoint.java Thu Apr 17 00:49:17 2014 @@ -23,20 +23,16 @@ import java.io.IOException; import java.util.Map; import org.apache.hadoop.hbase.HBaseTestingUtility; +import org.apache.hadoop.hbase.HRegionInfo; import org.apache.hadoop.hbase.KeyValue; import org.apache.hadoop.hbase.client.HTableInterface; import org.apache.hadoop.hbase.client.Put; import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLib; import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointLib.IAggregator; -import org.apache.hadoop.hbase.coprocessor.endpoints.EndpointManager; -import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpoint; -import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient; import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointClient.Caller; -import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointContext; -import org.apache.hadoop.hbase.coprocessor.endpoints.IEndpointFactory; import org.apache.hadoop.hbase.regionserver.HRegion; import org.apache.hadoop.hbase.util.Bytes; +import org.apache.hadoop.hbase.util.StringBytes; import org.junit.After; import org.junit.Assert; import org.junit.Before; @@ -47,12 +43,15 @@ import org.junit.Test; */ public class TestEndpoint { private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility(); - private static final byte[] TABLE_NAME = Bytes.toBytes("cp"); + private static final StringBytes TABLE_NAME = new StringBytes("cp"); private static final byte[] FAMILY_NAME = Bytes.toBytes("f"); private static final byte[] QUALITY_NAME = Bytes.toBytes("q"); @Before public void setUp() throws Exception { + TEST_UTIL.getConfiguration().set(HBaseTestingUtility.FS_TYPE_KEY, + HBaseTestingUtility.FS_TYPE_LFS); + TEST_UTIL.startMiniCluster(); // Register an endpoint in the server side. EndpointManager.get().register(ISummer.class, @@ -75,7 +74,7 @@ public class TestEndpoint { * */ public static interface ISummer extends IEndpoint { - byte[] sum() throws IOException; + long sum(int offset) throws IOException; } /** @@ -83,6 +82,7 @@ public class TestEndpoint { */ public static class Summer implements ISummer, IAggregator { IEndpointContext context; + int offset; long result; @Override @@ -91,21 +91,23 @@ public class TestEndpoint { } @Override - public byte[] sum() throws IOException { + public long sum(int offset) throws IOException { HRegion region = context.getRegion(); Scan scan = new Scan(); scan.addFamily(FAMILY_NAME); scan.addColumn(FAMILY_NAME, QUALITY_NAME); + this.offset = offset; this.result = 0L; EndpointLib.aggregateScan(region, scan, this); - return Bytes.toBytes(this.result); + return this.result; } @Override public void aggregate(KeyValue kv) { - this.result += Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), + long vl = Bytes.toLong(kv.getBuffer(), kv.getValueOffset(), kv.getValueLength()); + this.result += vl + offset; } } @@ -120,23 +122,42 @@ public class TestEndpoint { QUALITY_NAME, Bytes.toBytes((long) i))); } - // Calling endpoints. IEndpointClient cp = (IEndpointClient) table; - Map<byte[], byte[]> results = cp.coprocessorEndpoint(ISummer.class, null, - null, new Caller<ISummer>() { + + // Calling endpoints with zero offsets. + Map<HRegionInfo, Long> results = cp.coprocessorEndpoint(ISummer.class, null, + null, new Caller<ISummer, Long>() { @Override - public byte[] call(ISummer client) throws IOException { - return client.sum(); + public Long call(ISummer client) throws IOException { + return client.sum(0); } }); // Aggregates results from all regions long sum = 0; - for (byte[] res : results.values()) { - sum += Bytes.toLong(res); + for (Long res : results.values()) { + sum += res; } // Check the final results Assert.assertEquals("sum", 55, sum); + + // Calling endpoints with -1 offsets. + results = cp.coprocessorEndpoint(ISummer.class, null, + null, new Caller<ISummer, Long>() { + @Override + public Long call(ISummer client) throws IOException { + return client.sum(-1); + } + }); + + // Aggregates results from all regions + sum = 0; + for (Long res : results.values()) { + sum += res; + } + + // Check the final results + Assert.assertEquals("sum", 45, sum); } } Added: hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java URL: http://svn.apache.org/viewvc/hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java?rev=1588118&view=auto ============================================================================== --- hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java (added) +++ hbase/branches/0.89-fb/src/test/java/org/apache/hadoop/hbase/coprocessor/endpoints/TestEndpointBytesCodec.java Thu Apr 17 00:49:17 2014 @@ -0,0 +1,65 @@ +/** + * Copyright 2014 The Apache Software Foundation + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor.endpoints; + +import java.util.ArrayList; + +import org.apache.hadoop.hbase.util.Bytes; +import org.junit.Assert; +import org.junit.Test; + +/** + * Testcases for EndpointBytesCodec. + */ +public class TestEndpointBytesCodec { + @Test + public void testBasic() throws Exception { + final Object[] VALUES = { + false, true, + (byte) 2, + (char) 3, + (short) 4, + (int) 5, + (long) 6, + (float) 7., + (double) 8., + Bytes.toBytes("9"), + }; + + ArrayList<byte[]> arrayBytes = EndpointBytesCodec.encodeArray(VALUES); + for (int i = 0; i < VALUES.length; i++) { + Object vl = VALUES[i]; + byte[] bytes = EndpointBytesCodec.encodeObject(vl); + + Assert.assertArrayEquals("element in encodedArray[" + i + "]", bytes, + arrayBytes.get(i)); + + Object decoded = EndpointBytesCodec.decode(vl.getClass(), bytes); + if (vl instanceof byte[]) { + Assert.assertArrayEquals( + "recoved value of " + Bytes.toString((byte[]) vl), + (byte[]) vl, (byte[]) decoded); + } else { + Assert.assertEquals("recoved value of " + vl, vl, decoded); + } + } + + } +}
