SAMZA-769 - Replace deprecated method call and fix warnings
Project: http://git-wip-us.apache.org/repos/asf/samza/repo Commit: http://git-wip-us.apache.org/repos/asf/samza/commit/0e94975e Tree: http://git-wip-us.apache.org/repos/asf/samza/tree/0e94975e Diff: http://git-wip-us.apache.org/repos/asf/samza/diff/0e94975e Branch: refs/heads/samza-sql Commit: 0e94975e522e40f7cb89e98bd923a071e4c07301 Parents: 74aa516 Author: Aleksandar Bircakovic <[email protected]> Authored: Wed Nov 18 12:43:44 2015 -0800 Committer: Navina <[email protected]> Committed: Wed Nov 18 12:43:44 2015 -0800 ---------------------------------------------------------------------- .../java/org/apache/samza/system/SystemAdmin.java | 4 ++-- .../samza/autoscaling/deployer/ConfigManager.java | 5 +++-- .../org/apache/samza/autoscaling/utils/YarnUtil.java | 15 +++++++++------ .../apache/samza/config/JavaSerializerConfig.java | 4 ++-- .../org/apache/samza/storage/StorageRecovery.java | 1 + .../stream/TestCoordinatorStreamWriter.java | 1 + .../samza/storage/kv/RocksDbKeyValueReader.java | 2 ++ .../log4j/serializers/LoggingEventJsonSerde.java | 5 ++++- 8 files changed, 24 insertions(+), 13 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java ---------------------------------------------------------------------- diff --git a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java index bc926c5..ef99893 100644 --- a/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java +++ b/samza-api/src/main/java/org/apache/samza/system/SystemAdmin.java @@ -84,8 +84,8 @@ public interface SystemAdmin { * offset1 == offset2 and offset1 > offset2 respectively. Return * null if those two offsets are not comparable * - * @param offset1 - * @param offset2 + * @param offset1 First offset for comparison. + * @param offset2 Second offset for comparison. * @return -1 if offset1 < offset2; 0 if offset1 == offset2; 1 if offset1 > offset2. Null if not comparable */ Integer offsetComparator(String offset1, String offset2); http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java ---------------------------------------------------------------------- diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java index 7089796..87346bc 100644 --- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java +++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/deployer/ConfigManager.java @@ -21,6 +21,7 @@ package org.apache.samza.autoscaling.deployer; import joptsimple.OptionSet; + import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.samza.autoscaling.utils.YarnUtil; import org.apache.samza.config.Config; @@ -152,7 +153,6 @@ public class ConfigManager { coordinatorStreamConsumer.stop(); coordinatorServerURL = null; yarnUtil.stop(); - } /** @@ -201,6 +201,7 @@ public class ConfigManager { * @param keysToProcess a list of keys to process. Only messages with these keys will call their handler function, * and other messages will be skipped. If the list is empty all messages will be skipped. */ + @SuppressWarnings("unchecked") private void processConfigMessages(List<String> keysToProcess) { if (!coordinatorStreamConsumer.hasNewMessages(coordinatorStreamIterator)) { return; @@ -360,7 +361,7 @@ public class ConfigManager { * To run the code use the following command: * {path to samza deployment}/samza/bin/run-config-manager.sh --config-factory={config-factory} --config-path={path to config file of a job} * - * @param args + * @param args input arguments for running ConfigManager. */ public static void main(String[] args) { CommandLine cmdline = new CommandLine(); http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java ---------------------------------------------------------------------- diff --git a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java index b2d37a7..376c549 100644 --- a/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java +++ b/samza-autoscaling/src/main/java/org/apache/samza/autoscaling/utils/YarnUtil.java @@ -25,9 +25,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.http.HttpHost; import org.apache.http.HttpResponse; -import org.apache.http.client.HttpClient; import org.apache.http.client.methods.HttpGet; -import org.apache.http.impl.client.DefaultHttpClient; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.HttpClientBuilder; import org.apache.http.util.EntityUtils; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.type.TypeReference; @@ -44,13 +44,12 @@ import java.util.Map; */ public class YarnUtil { private static final Logger log = LoggerFactory.getLogger(YarnUtil.class); - private HttpClient httpclient; + private CloseableHttpClient httpclient; private HttpHost rmServer; private YarnClient yarnClient; public YarnUtil(String rmAddress, int rmPort) { - - this.httpclient = new DefaultHttpClient(); + this.httpclient = HttpClientBuilder.create().build(); this.rmServer = new HttpHost(rmAddress, rmPort, "http"); log.info("setting rm server to : " + rmServer); YarnConfiguration hConfig = new YarnConfiguration(); @@ -146,7 +145,11 @@ public class YarnUtil { * This function stops the YarnUtil by stopping the yarn client and http client. */ public void stop() { - httpclient.getConnectionManager().shutdown(); + try { + httpclient.close(); + } catch (IOException e) { + log.error("HTTP Client failed to close.", e); + } yarnClient.stop(); } http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java b/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java index 7db3e1c..946d4e2 100644 --- a/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java +++ b/samza-core/src/main/java/org/apache/samza/config/JavaSerializerConfig.java @@ -38,8 +38,8 @@ public class JavaSerializerConfig extends MapConfig { } /** - * Returns a list of all serializer names from the config file. Useful for - * getting individual serializers. + * Useful for getting individual serializers. + * @return a list of all serializer names from the config file */ public List<String> getSerdeNames() { List<String> results = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java ---------------------------------------------------------------------- diff --git a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java index c564964..0324e90 100644 --- a/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java +++ b/samza-core/src/main/java/org/apache/samza/storage/StorageRecovery.java @@ -204,6 +204,7 @@ public class StorageRecovery extends CommandLine { * create one TaskStorageManager for each task. Add all of them to the * List<TaskStorageManager> */ + @SuppressWarnings({ "unchecked", "rawtypes" }) private void getTaskStorageManagers() { StreamMetadataCache streamMetadataCache = new StreamMetadataCache(Util.javaMapAsScalaMap(systemAdmins), 5000, SystemClock.instance()); http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java ---------------------------------------------------------------------- diff --git a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java index 4eaaec2..f9c6304 100644 --- a/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java +++ b/samza-core/src/test/java/org/apache/samza/coordinator/stream/TestCoordinatorStreamWriter.java @@ -105,6 +105,7 @@ public class TestCoordinatorStreamWriter { assertTrue(systemProducer.isStopped()); } + @SuppressWarnings({ "unchecked", "rawtypes" }) public void testSendMessage() throws NoSuchMethodException, InvocationTargetException, IllegalAccessException { //check a correct message http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java ---------------------------------------------------------------------- diff --git a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java index 6dcb407..f570422 100644 --- a/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java +++ b/samza-kv-rocksdb/src/main/java/org/apache/samza/storage/kv/RocksDbKeyValueReader.java @@ -86,6 +86,8 @@ public class RocksDbKeyValueReader { * the db, it return null. * * @param key the key of the value you want to get + * @return deserialized value for the key + * Returns null, if the value doesn't exist */ public Object get(Object key) { byte[] byteKey = keySerde.toBytes(key); http://git-wip-us.apache.org/repos/asf/samza/blob/0e94975e/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java ---------------------------------------------------------------------- diff --git a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java index a18d8e0..129a4a0 100644 --- a/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java +++ b/samza-log4j/src/main/java/org/apache/samza/logging/log4j/serializers/LoggingEventJsonSerde.java @@ -50,6 +50,7 @@ public class LoggingEventJsonSerde implements Serde<LoggingEvent> { // Have to wrap rather than extend due to type collisions between // Serde<LoggingEvent> and Serde<Object>. + @SuppressWarnings("rawtypes") private final JsonSerde jsonSerde; /** @@ -68,15 +69,17 @@ public class LoggingEventJsonSerde implements Serde<LoggingEvent> { /** * Constructs the serde. - * + * * @param includeLocationInfo * Whether to include location info in the logging event or not. */ + @SuppressWarnings("rawtypes") public LoggingEventJsonSerde(boolean includeLocationInfo) { this.includeLocationInfo = includeLocationInfo; this.jsonSerde = new JsonSerde(); } + @SuppressWarnings("unchecked") @Override public byte[] toBytes(LoggingEvent loggingEvent) { Map<String, Object> loggingEventMap = encodeToMap(loggingEvent, includeLocationInfo);
