Repository: ambari Updated Branches: refs/heads/branch-dev-logsearch ffcf5328e -> 080c1ba9c
http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java new file mode 100644 index 0000000..32029ff --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/LogFeederUtil.java @@ -0,0 +1,557 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.ambari.logfeeder.util; + +import java.io.BufferedInputStream; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.lang.reflect.Type; +import java.net.InetAddress; +import java.net.URL; +import java.net.UnknownHostException; +import java.text.SimpleDateFormat; +import java.util.Date; +import java.util.HashMap; +import java.util.Hashtable; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.TimeZone; + +import org.apache.ambari.logfeeder.LogFeeder; +import org.apache.ambari.logfeeder.filter.Filter; +import org.apache.ambari.logfeeder.input.Input; +import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; +import org.apache.ambari.logfeeder.mapper.Mapper; +import org.apache.ambari.logfeeder.metrics.MetricCount; +import org.apache.ambari.logfeeder.output.Output; +import org.apache.commons.lang3.StringUtils; +import org.apache.log4j.Level; +import org.apache.log4j.Logger; +import org.codehaus.jackson.JsonParseException; +import org.codehaus.jackson.map.JsonMappingException; +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +import com.google.common.collect.ObjectArrays; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.reflect.TypeToken; + +/** + * This class contains utility methods used by LogFeeder + */ +public class LogFeederUtil { + private static final Logger logger = Logger.getLogger(LogFeederUtil.class); + + private static final int HASH_SEED = 31174077; + public final static String DATE_FORMAT = "yyyy-MM-dd HH:mm:ss.SSS"; + public final static String SOLR_DATE_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSS'Z'"; + private static Gson gson = new GsonBuilder().setDateFormat(DATE_FORMAT).create(); + + private static Properties props; + + private static Map<String, LogHistory> logHistoryList = new Hashtable<String, LogHistory>(); + private static int logInterval = 30000; // 30 seconds + + public static String hostName = null; + public static String ipAddress = null; + + private static String logfeederTempDir = null; + + private static final Object _LOCK = new Object(); + + static{ + setHostNameAndIP(); + } + + public static Gson getGson() { + return gson; + } + + private static ThreadLocal<SimpleDateFormat> dateFormatter = new ThreadLocal<SimpleDateFormat>() { + @Override + protected SimpleDateFormat initialValue() { + SimpleDateFormat sdf = new SimpleDateFormat(SOLR_DATE_FORMAT); + sdf.setTimeZone(TimeZone.getTimeZone("UTC")); + return sdf; + } + }; + + /** + * This method will read the properties from System, followed by propFile + * and finally from the map + */ + public static void loadProperties(String propFile, String[] propNVList) + throws Exception { + logger.info("Loading properties. propFile=" + propFile); + props = new Properties(System.getProperties()); + boolean propLoaded = false; + + // First get properties file path from environment value + String propertiesFilePath = System.getProperty("properties"); + if (propertiesFilePath != null && !propertiesFilePath.isEmpty()) { + File propertiesFile = new File(propertiesFilePath); + if (propertiesFile.exists() && propertiesFile.isFile()) { + logger.info("Properties file path set in environment. Loading properties file=" + + propertiesFilePath); + FileInputStream fileInputStream = null; + try { + fileInputStream = new FileInputStream(propertiesFile); + props.load(fileInputStream); + propLoaded = true; + } catch (Throwable t) { + logger.error("Error loading properties file. properties file=" + + propertiesFile.getAbsolutePath()); + } finally { + if (fileInputStream != null) { + try { + fileInputStream.close(); + } catch (Throwable t) { + // Ignore error + } + } + } + } else { + logger.error("Properties file path set in environment, but file not found. properties file=" + + propertiesFilePath); + } + } + + if (!propLoaded) { + BufferedInputStream fileInputStream = null; + try { + // Properties not yet loaded, let's try from class loader + fileInputStream = (BufferedInputStream) LogFeeder.class + .getClassLoader().getResourceAsStream(propFile); + if (fileInputStream != null) { + logger.info("Loading properties file " + propFile + + " from classpath"); + props.load(fileInputStream); + propLoaded = true; + } else { + logger.fatal("Properties file not found in classpath. properties file name= " + + propFile); + } + } finally { + if (fileInputStream != null) { + try { + fileInputStream.close(); + } catch (IOException e) { + } + } + } + } + + if (!propLoaded) { + logger.fatal("Properties file is not loaded."); + throw new Exception("Properties not loaded"); + } else { + updatePropertiesFromMap(propNVList); + } + } + + private static void updatePropertiesFromMap(String[] nvList) { + if (nvList == null) { + return; + } + logger.info("Trying to load additional proeprties from argument paramters. nvList.length=" + + nvList.length); + if (nvList != null && nvList.length > 0) { + for (String nv : nvList) { + logger.info("Passed nv=" + nv); + if (nv.startsWith("-") && nv.length() > 1) { + nv = nv.substring(1); + logger.info("Stripped nv=" + nv); + int i = nv.indexOf("="); + if (nv.length() > i) { + logger.info("Candidate nv=" + nv); + String name = nv.substring(0, i); + String value = nv.substring(i + 1); + logger.info("Adding property from argument to properties. name=" + + name + ", value=" + value); + props.put(name, value); + } + } + } + } + } + + static public String getStringProperty(String key) { + if (props != null) { + return props.getProperty(key); + } + return null; + } + + static public String getStringProperty(String key, String defaultValue) { + if (props != null) { + return props.getProperty(key, defaultValue); + } + return defaultValue; + } + + static public boolean getBooleanProperty(String key, boolean defaultValue) { + String strValue = getStringProperty(key); + return toBoolean(strValue, defaultValue); + } + + private static boolean toBoolean(String strValue, boolean defaultValue) { + boolean retValue = defaultValue; + if (!StringUtils.isEmpty(strValue)) { + if (strValue.equalsIgnoreCase("true") + || strValue.equalsIgnoreCase("yes")) { + retValue = true; + } else { + retValue = false; + } + } + return retValue; + } + + static public int getIntProperty(String key, int defaultValue) { + String strValue = getStringProperty(key); + int retValue = defaultValue; + retValue = objectToInt(strValue, retValue, ", key=" + key); + return retValue; + } + + public static int objectToInt(Object objValue, int retValue, + String errMessage) { + if (objValue == null) { + return retValue; + } + String strValue = objValue.toString(); + if (!StringUtils.isEmpty(strValue)) { + try { + retValue = Integer.parseInt(strValue); + } catch (Throwable t) { + logger.error("Error parsing integer value. str=" + strValue + + ", " + errMessage); + } + } + return retValue; + } + + public static boolean isEnabled(Map<String, Object> conditionConfigs, + Map<String, Object> valueConfigs) { + boolean allow = toBoolean((String) valueConfigs.get("is_enabled"), true); + @SuppressWarnings("unchecked") + Map<String, Object> conditions = (Map<String, Object>) conditionConfigs + .get("conditions"); + if (conditions != null && conditions.size() > 0) { + allow = false; + for (String conditionType : conditions.keySet()) { + if (conditionType.equalsIgnoreCase("fields")) { + @SuppressWarnings("unchecked") + Map<String, Object> fields = (Map<String, Object>) conditions + .get("fields"); + for (String fieldName : fields.keySet()) { + Object values = fields.get(fieldName); + if (values instanceof String) { + allow = isFieldConditionMatch(valueConfigs, + fieldName, (String) values); + } else { + @SuppressWarnings("unchecked") + List<String> listValues = (List<String>) values; + for (String stringValue : listValues) { + allow = isFieldConditionMatch(valueConfigs, + fieldName, stringValue); + if (allow) { + break; + } + } + } + if (allow) { + break; + } + } + } + if (allow) { + break; + } + } + } + return allow; + } + + public static boolean isFieldConditionMatch(Map<String, Object> configs, + String fieldName, String stringValue) { + boolean allow = false; + String fieldValue = (String) configs.get(fieldName); + if (fieldValue != null && fieldValue.equalsIgnoreCase(stringValue)) { + allow = true; + } else { + @SuppressWarnings("unchecked") + Map<String, Object> addFields = (Map<String, Object>) configs + .get("add_fields"); + if (addFields != null && addFields.get(fieldName) != null) { + String addFieldValue = (String) addFields.get(fieldName); + if (stringValue.equalsIgnoreCase(addFieldValue)) { + allow = true; + } + } + + } + return allow; + } + + public static void logStatForMetric(MetricCount metric, String prefixStr, + String postFix) { + long currStat = metric.count; + long currMS = System.currentTimeMillis(); + if (currStat > metric.prevLogCount) { + if (postFix == null) { + postFix = ""; + } + logger.info(prefixStr + ": total_count=" + metric.count + + ", duration=" + (currMS - metric.prevLogMS) / 1000 + + " secs, count=" + (currStat - metric.prevLogCount) + + postFix); + } + metric.prevLogCount = currStat; + metric.prevLogMS = currMS; + } + + public static Map<String, Object> cloneObject(Map<String, Object> map) { + if (map == null) { + return null; + } + String jsonStr = gson.toJson(map); + Type type = new TypeToken<Map<String, Object>>() { + }.getType(); + return gson.fromJson(jsonStr, type); + } + + public static Map<String, Object> toJSONObject(String jsonStr) { + if(jsonStr==null || jsonStr.trim().isEmpty()){ + return new HashMap<String, Object>(); + } + Type type = new TypeToken<Map<String, Object>>() { + }.getType(); + return gson.fromJson(jsonStr, type); + } + + static public boolean logErrorMessageByInterval(String key, String message, + Throwable e, Logger callerLogger, Level level) { + + LogHistory log = logHistoryList.get(key); + if (log == null) { + log = new LogHistory(); + logHistoryList.put(key, log); + } + if ((System.currentTimeMillis() - log.lastLogTime) > logInterval) { + log.lastLogTime = System.currentTimeMillis(); + int counter = log.counter; + log.counter = 0; + if (counter > 0) { + message += ". Messages suppressed before: " + counter; + } + if (e == null) { + callerLogger.log(level, message); + } else { + callerLogger.log(level, message, e); + } + + return true; + } else { + log.counter++; + } + return false; + + } + + static public String subString(String str, int maxLength) { + if (str == null || str.length() == 0) { + return ""; + } + maxLength = str.length() < maxLength ? str.length() : maxLength; + return str.substring(0, maxLength); + } + + public static long genHash(String value) { + if (value == null) { + value = "null"; + } + return MurmurHash.hash64A(value.getBytes(), HASH_SEED); + } + + private static class LogHistory { + private long lastLogTime = 0; + private int counter = 0; + } + + public static String getDate(String timeStampStr) { + try { + return dateFormatter.get().format(new Date(Long.parseLong(timeStampStr))); + } catch (Exception ex) { + logger.error(ex); + return null; + } + } + + public static String getActualDateStr() { + try { + return dateFormatter.get().format(new Date()); + } catch (Exception ex) { + logger.error(ex); + return null; + } + } + + public static File getFileFromClasspath(String filename) { + URL fileCompleteUrl = Thread.currentThread().getContextClassLoader() + .getResource(filename); + logger.debug("File Complete URI :" + fileCompleteUrl); + File file = null; + try { + file = new File(fileCompleteUrl.toURI()); + } catch (Exception exception) { + logger.debug(exception.getMessage(), exception.getCause()); + } + return file; + } + + public static Object getClassInstance(String classFullName, AliasUtil.ALIAS_TYPE aliasType) { + Object instance = null; + try { + instance = (Object) Class.forName(classFullName).getConstructor().newInstance(); + } catch (Exception exception) { + logger.error("Unsupported class =" + classFullName, exception.getCause()); + } + // check instance class as par aliasType + if (instance != null) { + boolean isValid = false; + switch (aliasType) { + case FILTER: + isValid = Filter.class.isAssignableFrom(instance.getClass()); + break; + case INPUT: + isValid = Input.class.isAssignableFrom(instance.getClass()); + break; + case OUTPUT: + isValid = Output.class.isAssignableFrom(instance.getClass()); + break; + case MAPPER: + isValid = Mapper.class.isAssignableFrom(instance.getClass()); + break; + default: + // by default consider all are valid class + isValid = true; + } + if (!isValid) { + logger.error("Not a valid class :" + classFullName + " AliasType :" + aliasType.name()); + } + } + return instance; + } + + public static HashMap<String, Object> readJsonFromFile(File jsonFile) { + ObjectMapper mapper = new ObjectMapper(); + try { + HashMap<String, Object> jsonmap = mapper.readValue(jsonFile, new TypeReference<HashMap<String, Object>>() { + }); + return jsonmap; + } catch (JsonParseException e) { + logger.error(e, e.getCause()); + } catch (JsonMappingException e) { + logger.error(e, e.getCause()); + } catch (IOException e) { + logger.error(e, e.getCause()); + } + return new HashMap<String, Object>(); + } + + public static boolean isListContains(List<String> list, String str, boolean caseSensitive) { + if (list != null) { + for (String value : list) { + if (value != null) { + if (caseSensitive) { + if (value.equals(str)) { + return true; + } + } else { + if (value.equalsIgnoreCase(str)) { + return true; + } + } + if (value.equalsIgnoreCase(LogFeederConstants.ALL)) { + return true; + } + } + } + } + return false; + } + + + private static synchronized String setHostNameAndIP() { + if (hostName == null || ipAddress == null) { + try { + InetAddress ip = InetAddress.getLocalHost(); + ipAddress = ip.getHostAddress(); + String getHostName = ip.getHostName(); + String getCanonicalHostName = ip.getCanonicalHostName(); + if (!getCanonicalHostName.equalsIgnoreCase(ipAddress)) { + logger.info("Using getCanonicalHostName()=" + getCanonicalHostName); + hostName = getCanonicalHostName; + } else { + logger.info("Using getHostName()=" + getHostName); + hostName = getHostName; + } + logger.info("ipAddress=" + ipAddress + ", getHostName=" + getHostName + + ", getCanonicalHostName=" + getCanonicalHostName + ", hostName=" + + hostName); + } catch (UnknownHostException e) { + logger.error("Error getting hostname.", e); + } + } + return hostName; + } + + public static String[] mergeArray(String[] first, String[] second) { + if (first == null) { + first = new String[0]; + } + if (second == null) { + second = new String[0]; + } + String[] mergedArray = ObjectArrays.concat(first, second, String.class); + return mergedArray; + } + + public static String getLogfeederTempDir() { + if (logfeederTempDir == null) { + synchronized (_LOCK) { + if (logfeederTempDir == null) { + String tempDirValue = getStringProperty("logfeeder.tmp.dir", + "/tmp/$username/logfeeder/"); + HashMap<String, String> contextParam = new HashMap<String, String>(); + String username = System.getProperty("user.name"); + contextParam.put("username", username); + logfeederTempDir = PlaceholderUtil.replaceVariables(tempDirValue, + contextParam); + } + } + } + return logfeederTempDir; + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java new file mode 100644 index 0000000..dbbefaf --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/MurmurHash.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ambari.logfeeder.util; + +import com.google.common.primitives.Ints; + +import java.nio.ByteBuffer; +import java.nio.ByteOrder; + +/** + * This is a very fast, non-cryptographic hash suitable for general hash-based + * lookup. See http://murmurhash.googlepages.com/ for more details. + * <p/> + * <p>The C version of MurmurHash 2.0 found at that site was ported + * to Java by Andrzej Bialecki (ab at getopt org).</p> + */ +public final class MurmurHash { + + private MurmurHash() { + } + + /** + * Hashes an int. + * + * @param data The int to hash. + * @param seed The seed for the hash. + * @return The 32 bit hash of the bytes in question. + */ + public static int hash(int data, int seed) { + return hash(ByteBuffer.wrap(Ints.toByteArray(data)), seed); + } + + /** + * Hashes bytes in an array. + * + * @param data The bytes to hash. + * @param seed The seed for the hash. + * @return The 32 bit hash of the bytes in question. + */ + public static int hash(byte[] data, int seed) { + return hash(ByteBuffer.wrap(data), seed); + } + + /** + * Hashes bytes in part of an array. + * + * @param data The data to hash. + * @param offset Where to start munging. + * @param length How many bytes to process. + * @param seed The seed to start with. + * @return The 32-bit hash of the data in question. + */ + public static int hash(byte[] data, int offset, int length, int seed) { + return hash(ByteBuffer.wrap(data, offset, length), seed); + } + + /** + * Hashes the bytes in a buffer from the current position to the limit. + * + * @param buf The bytes to hash. + * @param seed The seed for the hash. + * @return The 32 bit murmur hash of the bytes in the buffer. + */ + public static int hash(ByteBuffer buf, int seed) { + // save byte order for later restoration + ByteOrder byteOrder = buf.order(); + buf.order(ByteOrder.LITTLE_ENDIAN); + + int m = 0x5bd1e995; + int r = 24; + + int h = seed ^ buf.remaining(); + + while (buf.remaining() >= 4) { + int k = buf.getInt(); + + k *= m; + k ^= k >>> r; + k *= m; + + h *= m; + h ^= k; + } + + if (buf.remaining() > 0) { + ByteBuffer finish = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN); + // for big-endian version, use this first: + // finish.position(4-buf.remaining()); + finish.put(buf).rewind(); + h ^= finish.getInt(); + h *= m; + } + + h ^= h >>> 13; + h *= m; + h ^= h >>> 15; + + buf.order(byteOrder); + return h; + } + + + public static long hash64A(byte[] data, int seed) { + return hash64A(ByteBuffer.wrap(data), seed); + } + + public static long hash64A(byte[] data, int offset, int length, int seed) { + return hash64A(ByteBuffer.wrap(data, offset, length), seed); + } + + public static long hash64A(ByteBuffer buf, int seed) { + ByteOrder byteOrder = buf.order(); + buf.order(ByteOrder.LITTLE_ENDIAN); + + long m = 0xc6a4a7935bd1e995L; + int r = 47; + + long h = seed ^ (buf.remaining() * m); + + while (buf.remaining() >= 8) { + long k = buf.getLong(); + + k *= m; + k ^= k >>> r; + k *= m; + + h ^= k; + h *= m; + } + + if (buf.remaining() > 0) { + ByteBuffer finish = ByteBuffer.allocate(8).order(ByteOrder.LITTLE_ENDIAN); + // for big-endian version, do this first: + // finish.position(8-buf.remaining()); + finish.put(buf).rewind(); + h ^= finish.getLong(); + h *= m; + } + + h ^= h >>> r; + h *= m; + h ^= h >>> r; + + buf.order(byteOrder); + return h; + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java new file mode 100644 index 0000000..10ea2c2 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/S3Util.java @@ -0,0 +1,186 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import java.io.BufferedReader; +import java.io.File; +import java.io.IOException; +import java.io.InputStream; +import java.io.InputStreamReader; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.zip.GZIPInputStream; + +import org.apache.commons.io.IOUtils; +import org.apache.log4j.Logger; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.auth.AWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; +import com.amazonaws.services.s3.model.GetObjectRequest; +import com.amazonaws.services.s3.model.ObjectMetadata; +import com.amazonaws.services.s3.model.PutObjectRequest; +import com.amazonaws.services.s3.model.S3Object; +import com.amazonaws.services.s3.transfer.TransferManager; +import com.amazonaws.services.s3.transfer.Upload; + +/** + * Utility to connect to s3 + */ +public class S3Util { + public static final S3Util INSTANCE = new S3Util(); + + private static final Logger LOG = Logger.getLogger(S3Util.class); + + public static final String S3_PATH_START_WITH = "s3://"; + public static final String S3_PATH_SEPARATOR = "/"; + + public AmazonS3 getS3Client(String accessKey, String secretKey) { + AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials( + accessKey, secretKey); + AmazonS3 s3client; + if (awsCredentials != null) { + s3client = new AmazonS3Client(awsCredentials); + } else { + s3client = new AmazonS3Client(); + } + return s3client; + } + + public TransferManager getTransferManager(String accessKey, String secretKey) { + AWSCredentials awsCredentials = AWSUtil.INSTANCE.createAWSCredentials( + accessKey, secretKey); + TransferManager transferManager; + if (awsCredentials != null) { + transferManager = new TransferManager(awsCredentials); + } else { + transferManager = new TransferManager(); + } + return transferManager; + } + + public void shutdownTransferManager(TransferManager transferManager) { + if (transferManager != null) { + transferManager.shutdownNow(); + } + } + + public String getBucketName(String s3Path) { + String bucketName = null; + // s3path + if (s3Path != null) { + String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split( + S3_PATH_SEPARATOR); + bucketName = s3PathParts[0]; + } + return bucketName; + } + + public String getS3Key(String s3Path) { + StringBuilder s3Key = new StringBuilder(); + // s3path + if (s3Path != null) { + String[] s3PathParts = s3Path.replace(S3_PATH_START_WITH, "").split( + S3_PATH_SEPARATOR); + ArrayList<String> s3PathList = new ArrayList<String>( + Arrays.asList(s3PathParts)); + s3PathList.remove(0);// remove bucketName + for (int index = 0; index < s3PathList.size(); index++) { + if (index > 0) { + s3Key.append(S3_PATH_SEPARATOR); + } + s3Key.append(s3PathList.get(index)); + } + } + return s3Key.toString(); + } + + public void uploadFileTos3(String bucketName, String s3Key, File localFile, + String accessKey, String secretKey) { + TransferManager transferManager = getTransferManager(accessKey, secretKey); + try { + Upload upload = transferManager.upload(bucketName, s3Key, localFile); + upload.waitForUploadResult(); + } catch (AmazonClientException | InterruptedException e) { + LOG.error("s3 uploading failed for file :" + localFile.getAbsolutePath(), + e); + } finally { + shutdownTransferManager(transferManager); + } + } + + /** + * Get the buffer reader to read s3 file as a stream + */ + public BufferedReader getReader(String s3Path, String accessKey, + String secretKey) throws IOException { + // TODO error handling + // Compression support + // read header and decide the compression(auto detection) + // For now hard-code GZIP compression + String s3Bucket = getBucketName(s3Path); + String s3Key = getS3Key(s3Path); + S3Object fileObj = getS3Client(accessKey, secretKey).getObject( + new GetObjectRequest(s3Bucket, s3Key)); + GZIPInputStream objectInputStream; + try { + objectInputStream = new GZIPInputStream(fileObj.getObjectContent()); + BufferedReader bufferedReader = new BufferedReader(new InputStreamReader( + objectInputStream)); + return bufferedReader; + } catch (IOException e) { + LOG.error("Error in creating stream reader for s3 file :" + s3Path, + e.getCause()); + throw e; + } + } + + public void writeIntoS3File(String data, String bucketName, String s3Key, + String accessKey, String secretKey) { + InputStream in = null; + try { + in = IOUtils.toInputStream(data, "UTF-8"); + } catch (IOException e) { + LOG.error(e); + } + if (in != null) { + TransferManager transferManager = getTransferManager(accessKey, secretKey); + try { + if (transferManager != null) { + transferManager.upload( + new PutObjectRequest(bucketName, s3Key, in, + new ObjectMetadata())).waitForUploadResult(); + LOG.debug("Data Uploaded to s3 file :" + s3Key + " in bucket :" + + bucketName); + } + } catch (AmazonClientException | InterruptedException e) { + LOG.error(e); + } finally { + try { + shutdownTransferManager(transferManager); + in.close(); + } catch (IOException e) { + // ignore + } + } + } + } + +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java index aaf809f..44113e1 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/main/java/org/apache/ambari/logfeeder/util/SolrUtil.java @@ -21,7 +21,6 @@ package org.apache.ambari.logfeeder.util; import java.io.IOException; import java.util.HashMap; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.apache.ambari.logfeeder.logconfig.LogFeederConstants; import org.apache.log4j.Level; import org.apache.log4j.Logger; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java index 9f943ec..3aa8d7b 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterGrokTest.java @@ -21,9 +21,9 @@ package org.apache.ambari.logfeeder.filter; import java.util.HashMap; import java.util.Map; -import org.apache.ambari.logfeeder.OutputMgr; import org.apache.ambari.logfeeder.input.Input; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.output.OutputMgr; import org.apache.log4j.Logger; import org.easymock.Capture; import org.easymock.CaptureType; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java index cdec4df..64e9b69 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterJSONTest.java @@ -25,10 +25,10 @@ import java.util.HashMap; import java.util.Map; import java.util.TimeZone; -import org.apache.ambari.logfeeder.LogFeederUtil; -import org.apache.ambari.logfeeder.OutputMgr; -import org.apache.ambari.logfeeder.exception.LogfeederException; +import org.apache.ambari.logfeeder.common.LogfeederException; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.output.OutputMgr; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.log4j.Logger; import org.easymock.Capture; import org.easymock.CaptureType; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java index 58db8f2..849e4c3 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/filter/FilterKeyValueTest.java @@ -21,8 +21,8 @@ package org.apache.ambari.logfeeder.filter; import java.util.HashMap; import java.util.Map; -import org.apache.ambari.logfeeder.OutputMgr; import org.apache.ambari.logfeeder.input.InputMarker; +import org.apache.ambari.logfeeder.output.OutputMgr; import org.apache.log4j.Logger; import org.easymock.Capture; import org.easymock.CaptureType; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java index 2242a83..42e81da 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/input/InputFileTest.java @@ -25,7 +25,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.ambari.logfeeder.InputMgr; import org.apache.ambari.logfeeder.filter.Filter; import org.apache.ambari.logfeeder.input.InputMarker; import org.apache.commons.io.FileUtils; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java index 2df03bd..0652182 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/mapper/MapperDateTest.java @@ -24,7 +24,7 @@ import java.util.Date; import java.util.HashMap; import java.util.Map; -import org.apache.ambari.logfeeder.LogFeederUtil; +import org.apache.ambari.logfeeder.util.LogFeederUtil; import org.apache.commons.lang3.time.DateUtils; import org.apache.log4j.Logger; import org.junit.Test; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java index 49cee56..cc6da56 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3LogPathResolverTest.java @@ -18,11 +18,12 @@ package org.apache.ambari.logfeeder.output; -import org.apache.ambari.logfeeder.LogFeederUtil; import org.junit.Test; import static org.junit.Assert.assertEquals; +import org.apache.ambari.logfeeder.util.LogFeederUtil; + public class S3LogPathResolverTest { @Test http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java index a0c398e..c64e0c5 100644 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/output/S3UploaderTest.java @@ -18,7 +18,7 @@ package org.apache.ambari.logfeeder.output; -import org.apache.ambari.logfeeder.s3.S3Util; +import org.apache.ambari.logfeeder.util.S3Util; import org.junit.Test; import java.io.File; http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java deleted file mode 100644 index 4f0d1aa..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/AWSUtilTest.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.s3; - -public class AWSUtilTest { - public void testAWSUtil_getAwsUserName() throws Exception { - String S3_ACCESS_KEY = "S3_ACCESS_KEY"; - String S3_SECRET_KEY = "S3_SECRET_KEY"; - AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, S3_SECRET_KEY); - } -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java deleted file mode 100644 index af14140..0000000 --- a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/s3/S3UtilTest.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.ambari.logfeeder.s3; - -import static org.junit.Assert.assertEquals; - -public class S3UtilTest { - public void testS3Util_pathToBucketName() throws Exception { - String s3Path = "s3://bucket_name/path/file.txt"; - String expectedBucketName = "bucket_name"; - String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path); - assertEquals(expectedBucketName, actualBucketName); - } - - public void testS3Util_pathToS3Key() throws Exception { - String s3Path = "s3://bucket_name/path/file.txt"; - String expectedS3key = "path/file.txt"; - String actualS3key = S3Util.INSTANCE.getS3Key(s3Path); - assertEquals(expectedS3key, actualS3key); - } - -} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java new file mode 100644 index 0000000..6df2283 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/AWSUtilTest.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import org.apache.ambari.logfeeder.util.AWSUtil; + +public class AWSUtilTest { + public void testAWSUtil_getAwsUserName() throws Exception { + String S3_ACCESS_KEY = "S3_ACCESS_KEY"; + String S3_SECRET_KEY = "S3_SECRET_KEY"; + AWSUtil.INSTANCE.getAwsUserName(S3_ACCESS_KEY, S3_SECRET_KEY); + } +} http://git-wip-us.apache.org/repos/asf/ambari/blob/080c1ba9/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java ---------------------------------------------------------------------- diff --git a/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java new file mode 100644 index 0000000..84554b0 --- /dev/null +++ b/ambari-logsearch/ambari-logsearch-logfeeder/src/test/java/org/apache/ambari/logfeeder/util/S3UtilTest.java @@ -0,0 +1,40 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.ambari.logfeeder.util; + +import static org.junit.Assert.assertEquals; + +import org.apache.ambari.logfeeder.util.S3Util; + +public class S3UtilTest { + public void testS3Util_pathToBucketName() throws Exception { + String s3Path = "s3://bucket_name/path/file.txt"; + String expectedBucketName = "bucket_name"; + String actualBucketName = S3Util.INSTANCE.getBucketName(s3Path); + assertEquals(expectedBucketName, actualBucketName); + } + + public void testS3Util_pathToS3Key() throws Exception { + String s3Path = "s3://bucket_name/path/file.txt"; + String expectedS3key = "path/file.txt"; + String actualS3key = S3Util.INSTANCE.getS3Key(s3Path); + assertEquals(expectedS3key, actualS3key); + } + +}
