http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java b/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java deleted file mode 100644 index c64b02d..0000000 --- a/agents-common/src/main/java/com/xasecure/admin/client/XaAdminRESTClient.java +++ /dev/null @@ -1,390 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - package com.xasecure.admin.client; - -import java.io.File; -import java.io.FileInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.InputStream; -import java.util.List; -import java.security.KeyManagementException; -import java.security.KeyStore; -import java.security.KeyStoreException; -import java.security.NoSuchAlgorithmException; -import java.security.SecureRandom; -import java.security.UnrecoverableKeyException; -import java.security.cert.CertificateException; - -import javax.net.ssl.HostnameVerifier; -import javax.net.ssl.KeyManager; -import javax.net.ssl.KeyManagerFactory; -import javax.net.ssl.SSLContext; -import javax.net.ssl.SSLSession; -import javax.net.ssl.TrustManager; -import javax.net.ssl.TrustManagerFactory; - -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.client.urlconnection.HTTPSProperties; - -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.conf.Configuration; - -import com.xasecure.admin.client.datatype.GrantRevokeData; -import com.xasecure.admin.client.datatype.RESTResponse; -import com.xasecure.authorization.utils.StringUtil; -import com.xasecure.authorization.hadoop.config.XaSecureConfiguration; -import com.xasecure.authorization.hadoop.utils.XaSecureCredentialProvider; - - -public class XaAdminRESTClient implements XaAdminClient { - private static final Log LOG = LogFactory.getLog(XaAdminRESTClient.class); - - public static final String XASECURE_PROP_POLICYMGR_URL = "xasecure.policymgr.url"; - public static final String XASECURE_PROP_POLICYMGR_SSLCONFIG_FILENAME = "xasecure.policymgr.sslconfig.filename"; - - public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE = "xasecure.policymgr.clientssl.keystore"; - public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_PASSWORD = "xasecure.policymgr.clientssl.keystore.password"; - public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE = "xasecure.policymgr.clientssl.keystore.type"; - public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL = "xasecure.policymgr.clientssl.keystore.credential.file"; - public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL_ALIAS = "sslKeyStore"; - public static final String XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE_DEFAULT = "jks"; - - public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE = "xasecure.policymgr.clientssl.truststore"; - public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_PASSWORD = "xasecure.policymgr.clientssl.truststore.password"; - public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE = "xasecure.policymgr.clientssl.truststore.type"; - public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL = "xasecure.policymgr.clientssl.truststore.credential.file"; - public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL_ALIAS = "sslTrustStore"; - public static final String XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE_DEFAULT = "jks"; - - public static final String XASECURE_SSL_KEYMANAGER_ALGO_TYPE = "SunX509" ; - public static final String XASECURE_SSL_TRUSTMANAGER_ALGO_TYPE = "SunX509" ; - public static final String XASECURE_SSL_CONTEXT_ALGO_TYPE = "SSL" ; - - public static final String REST_EXPECTED_MIME_TYPE = "application/json" ; - - private static final String REST_URL_PATH_POLICYLIST = "/service/assets/policyList/"; - private static final String REST_URL_PATH_GRANT = "/service/assets/resources/grant"; - private static final String REST_URL_PATH_REVOKE = "/service/assets/resources/revoke"; - private static final String REST_URL_PARAM_LASTUPDATED_TIME = "epoch"; - private static final String REST_URL_PARAM_POLICY_COUNT = "policyCount"; - private static final String REST_URL_PARAM_AGENT_NAME = "agentId"; - - private String mUrl = null; - private String mSslConfigFileName = null; - private boolean mIsSSL = false; - - private String mKeyStoreURL = null; - private String mKeyStoreAlias = null; - private String mKeyStoreFile = null; - private String mKeyStoreType = null; - private String mTrustStoreURL = null; - private String mTrustStoreAlias = null; - private String mTrustStoreFile = null; - private String mTrustStoreType = null; - - - public XaAdminRESTClient() { - mUrl = XaSecureConfiguration.getInstance().get(XASECURE_PROP_POLICYMGR_URL); - mSslConfigFileName = XaSecureConfiguration.getInstance().get(XASECURE_PROP_POLICYMGR_SSLCONFIG_FILENAME); - - init(); - } - - public XaAdminRESTClient(String url, String sslConfigFileName) { - mUrl = url; - mSslConfigFileName = sslConfigFileName; - - init(); - } - - @Override - public String getPolicies(String repositoryName, long lastModifiedTime, int policyCount, String agentName) { - String ret = null; - Client client = null; - - try { - client = buildClient(); - - WebResource webResource = client.resource(mUrl + REST_URL_PATH_POLICYLIST + repositoryName) - .queryParam(REST_URL_PARAM_LASTUPDATED_TIME, String.valueOf(lastModifiedTime)) - .queryParam(REST_URL_PARAM_POLICY_COUNT, String.valueOf(policyCount)) - .queryParam(REST_URL_PARAM_AGENT_NAME, agentName); - - ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).get(ClientResponse.class); - - if(response != null && response.getStatus() == 200) { - ret = response.getEntity(String.class); - } - } finally { - destroy(client); - } - - return ret; - } - - @Override - public void grantPrivilege(GrantRevokeData grData) throws Exception { - Client client = null; - - try { - client = buildClient(); - - WebResource webResource = client.resource(mUrl + REST_URL_PATH_GRANT); - - ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).type(REST_EXPECTED_MIME_TYPE).post(ClientResponse.class, grData.toJson()); - - if(response == null || response.getStatus() != 200) { - RESTResponse resp = RESTResponse.fromClientResponse(response); - - throw new Exception(resp.getMessage()); - } - } finally { - destroy(client); - } - } - - @Override - public void revokePrivilege(GrantRevokeData grData) throws Exception { - Client client = null; - - try { - client = buildClient(); - - WebResource webResource = client.resource(mUrl + REST_URL_PATH_REVOKE); - - ClientResponse response = webResource.accept(REST_EXPECTED_MIME_TYPE).type(REST_EXPECTED_MIME_TYPE).post(ClientResponse.class, grData.toJson()); - - if(response == null || response.getStatus() != 200) { - RESTResponse resp = RESTResponse.fromClientResponse(response); - - throw new Exception(resp.getMessage()); - } - } finally { - destroy(client); - } - } - private void init() { - mIsSSL = StringUtil.containsIgnoreCase(mUrl, "https"); - - InputStream in = null ; - - try { - Configuration conf = new Configuration() ; - - in = getFileInputStream(mSslConfigFileName) ; - - if (in != null) { - conf.addResource(in); - } - - mKeyStoreURL = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL); - mKeyStoreAlias = XASECURE_POLICYMGR_CLIENT_KEY_FILE_CREDENTIAL_ALIAS; - mKeyStoreType = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE, XASECURE_POLICYMGR_CLIENT_KEY_FILE_TYPE_DEFAULT); - mKeyStoreFile = conf.get(XASECURE_POLICYMGR_CLIENT_KEY_FILE); - - mTrustStoreURL = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL); - mTrustStoreAlias = XASECURE_POLICYMGR_TRUSTSTORE_FILE_CREDENTIAL_ALIAS; - mTrustStoreType = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE, XASECURE_POLICYMGR_TRUSTSTORE_FILE_TYPE_DEFAULT); - mTrustStoreFile = conf.get(XASECURE_POLICYMGR_TRUSTSTORE_FILE); - } - catch(IOException ioe) { - LOG.error("Unable to load SSL Config FileName: [" + mSslConfigFileName + "]", ioe); - } - finally { - close(in, mSslConfigFileName); - } - } - - private synchronized Client buildClient() { - Client client = null; - - if (mIsSSL) { - KeyManager[] kmList = getKeyManagers(); - TrustManager[] tmList = getTrustManagers(); - SSLContext sslContext = getSSLContext(kmList, tmList); - ClientConfig config = new DefaultClientConfig(); - - HostnameVerifier hv = new HostnameVerifier() { - public boolean verify(String urlHostName, SSLSession session) { - return session.getPeerHost().equals(urlHostName); - } - }; - - config.getProperties().put(HTTPSProperties.PROPERTY_HTTPS_PROPERTIES, new HTTPSProperties(hv, sslContext)); - - client = Client.create(config); - } - - if(client == null) { - client = Client.create(); - } - - return client; - } - - private KeyManager[] getKeyManagers() { - KeyManager[] kmList = null; - - String keyStoreFilepwd = getCredential(mKeyStoreURL, mKeyStoreAlias); - - if (!StringUtil.isEmpty(mKeyStoreFile) && !StringUtil.isEmpty(keyStoreFilepwd)) { - InputStream in = null ; - - try { - in = getFileInputStream(mKeyStoreFile) ; - - if (in != null) { - KeyStore keyStore = KeyStore.getInstance(mKeyStoreType); - - keyStore.load(in, keyStoreFilepwd.toCharArray()); - - KeyManagerFactory keyManagerFactory = KeyManagerFactory.getInstance(XASECURE_SSL_KEYMANAGER_ALGO_TYPE); - - keyManagerFactory.init(keyStore, keyStoreFilepwd.toCharArray()); - - kmList = keyManagerFactory.getKeyManagers(); - } else { - LOG.error("Unable to obtain keystore from file [" + mKeyStoreFile + "]"); - } - } catch (KeyStoreException e) { - LOG.error("Unable to obtain from KeyStore", e); - } catch (NoSuchAlgorithmException e) { - LOG.error("SSL algorithm is available in the environment", e); - } catch (CertificateException e) { - LOG.error("Unable to obtain the requested certification ", e); - } catch (FileNotFoundException e) { - LOG.error("Unable to find the necessary SSL Keystore and TrustStore Files", e); - } catch (IOException e) { - LOG.error("Unable to read the necessary SSL Keystore and TrustStore Files", e); - } catch (UnrecoverableKeyException e) { - LOG.error("Unable to recover the key from keystore", e); - } finally { - close(in, mKeyStoreFile); - } - } - - return kmList; - } - - private TrustManager[] getTrustManagers() { - TrustManager[] tmList = null; - - String trustStoreFilepwd = getCredential(mTrustStoreURL, mTrustStoreAlias); - - if (!StringUtil.isEmpty(mTrustStoreFile) && !StringUtil.isEmpty(trustStoreFilepwd)) { - InputStream in = null ; - - try { - in = getFileInputStream(mTrustStoreFile) ; - - if (in != null) { - KeyStore trustStore = KeyStore.getInstance(mTrustStoreType); - - trustStore.load(in, trustStoreFilepwd.toCharArray()); - - TrustManagerFactory trustManagerFactory = TrustManagerFactory.getInstance(XASECURE_SSL_TRUSTMANAGER_ALGO_TYPE); - - trustManagerFactory.init(trustStore); - - tmList = trustManagerFactory.getTrustManagers(); - } else { - LOG.error("Unable to obtain keystore from file [" + mTrustStoreFile + "]"); - } - } catch (KeyStoreException e) { - LOG.error("Unable to obtain from KeyStore", e); - } catch (NoSuchAlgorithmException e) { - LOG.error("SSL algorithm is available in the environment", e); - } catch (CertificateException e) { - LOG.error("Unable to obtain the requested certification ", e); - } catch (FileNotFoundException e) { - LOG.error("Unable to find the necessary SSL Keystore and TrustStore Files", e); - } catch (IOException e) { - LOG.error("Unable to read the necessary SSL Keystore and TrustStore Files", e); - } finally { - close(in, mTrustStoreFile); - } - } - - return tmList; - } - - private SSLContext getSSLContext(KeyManager[] kmList, TrustManager[] tmList) { - try { - if(kmList != null && tmList != null) { - SSLContext sslContext = SSLContext.getInstance(XASECURE_SSL_CONTEXT_ALGO_TYPE); - - sslContext.init(kmList, tmList, new SecureRandom()); - - return sslContext; - } - } catch (NoSuchAlgorithmException e) { - LOG.error("SSL algorithm is available in the environment", e); - } catch (KeyManagementException e) { - LOG.error("Unable to initials the SSLContext", e); - } - - return null; - } - - private String getCredential(String url, String alias) { - char[] credStr = XaSecureCredentialProvider.getInstance().getCredentialString(url, alias); - - return credStr == null ? null : new String(credStr); - } - - private InputStream getFileInputStream(String fileName) throws IOException { - InputStream in = null ; - - if(! StringUtil.isEmpty(fileName)) { - File f = new File(fileName) ; - - if (f.exists()) { - in = new FileInputStream(f) ; - } - else { - in = ClassLoader.getSystemResourceAsStream(fileName) ; - } - } - - return in ; - } - - private void close(InputStream str, String filename) { - if (str != null) { - try { - str.close() ; - } catch (IOException excp) { - LOG.error("Error while closing file: [" + filename + "]", excp) ; - } - } - } - - private void destroy(Client client) { - if(client != null) { - client.destroy(); - } - } -}
http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java b/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java deleted file mode 100644 index b5228db..0000000 --- a/agents-common/src/main/java/com/xasecure/admin/client/datatype/GrantRevokeData.java +++ /dev/null @@ -1,273 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - package com.xasecure.admin.client.datatype; - - -import java.io.IOException; -import java.util.List; -import java.util.ArrayList; - -import org.codehaus.jackson.JsonGenerationException; -import org.codehaus.jackson.annotate.JsonAutoDetect; -import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.map.annotate.JsonSerialize; -import org.codehaus.jackson.map.JsonMappingException; -import org.codehaus.jackson.map.ObjectMapper; - -import com.xasecure.authorization.utils.StringUtil; - - -@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class GrantRevokeData implements java.io.Serializable { - private static final long serialVersionUID = 1L; - - private String grantor; - private String repositoryName; - private String repositoryType; - private String databases; - private String tables; - private String columns; - private String columnFamilies; - private boolean isEnabled; - private boolean isAuditEnabled; - private boolean replacePerm; - private List<PermMap> permMapList = new ArrayList<PermMap>(); - - private static String WILDCARD_ASTERISK = "*"; - - - public GrantRevokeData() { - } - - public String getGrantor() { - return grantor; - } - - public void setGrantor(String grantor) { - this.grantor = grantor; - } - - public String getRepositoryName() { - return repositoryName; - } - - public void setRepositoryName(String repositoryName) { - this.repositoryName = repositoryName; - } - - public String getRepositoryType() { - return repositoryType; - } - - public void setRepositoryType(String repositoryType) { - this.repositoryType = repositoryType; - } - - public String getDatabases() { - return databases; - } - - public void setDatabases(String databases) { - this.databases = databases; - } - - public String getTables() { - return tables; - } - - public void setTables(String tables) { - this.tables = tables; - } - - public String getColumns() { - return columns; - } - - public void setColumns(String columns) { - this.columns = columns; - } - - public String getColumnFamilies() { - return columnFamilies; - } - - public void setColumnFamilies(String columnFamilies) { - this.columnFamilies = columnFamilies; - } - - public List<PermMap> getPermMapList() { - return permMapList; - } - - public void setPermMapList(List<PermMap> permMapList) { - this.permMapList = permMapList; - } - - - public void setHiveData(String grantor, - String repositoryName, - String databases, - String tables, - String columns, - PermMap permMap) { - this.grantor = grantor; - this.repositoryName = repositoryName; - this.repositoryType = "hive"; - this.databases = StringUtil.isEmpty(databases) ? WILDCARD_ASTERISK : databases; - this.tables = StringUtil.isEmpty(tables) ? WILDCARD_ASTERISK : tables; - this.columns = StringUtil.isEmpty(columns) ? WILDCARD_ASTERISK : columns; - this.isAuditEnabled = true; - this.isEnabled = true; - this.replacePerm = false; - this.permMapList.add(permMap); - } - - public void setHBaseData(String grantor, - String repositoryName, - String tables, - String columns, - String columnFamilies, - PermMap permMap) { - this.grantor = grantor; - this.repositoryName = repositoryName; - this.repositoryType = "hbase"; - this.tables = StringUtil.isEmpty(tables) ? WILDCARD_ASTERISK : tables; - this.columns = StringUtil.isEmpty(columns) ? WILDCARD_ASTERISK : columns; - this.columnFamilies = StringUtil.isEmpty(columnFamilies) ? WILDCARD_ASTERISK : columnFamilies; - this.isAuditEnabled = true; - this.isEnabled = true; - this.replacePerm = true; - this.permMapList.add(permMap); - } - - public String toJson() { - try { - ObjectMapper om = new ObjectMapper(); - - return om.writeValueAsString(this); - } catch (JsonGenerationException e) { - e.printStackTrace(); - } catch (JsonMappingException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - - return ""; - } - - @Override - public String toString() { - return toJson(); - } - - - @JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) - @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) - @JsonIgnoreProperties(ignoreUnknown = true) - public static class PermMap { - private List<String> userList = new ArrayList<String>(); - private List<String> groupList = new ArrayList<String>(); - private List<String> permList = new ArrayList<String>(); - - public PermMap() { - } - - public PermMap(String user, String group, String perm) { - addUser(user); - addGroup(group); - addPerm(perm); - } - - public PermMap(List<String> userList, List<String> groupList, List<String> permList) { - copyList(userList, this.userList); - copyList(groupList, this.groupList); - copyList(permList, this.permList); - } - - public List<String> getUserList() { - return userList; - } - - public List<String> getGroupList() { - return groupList; - } - - public List<String> getPermList() { - return permList; - } - - public void addUser(String user) { - addToList(user, userList); - } - - public void addGroup(String group) { - addToList(group, groupList); - } - - public void addPerm(String perm) { - addToList(perm, permList); - } - - private void addToList(String str, List<String> list) { - if(list != null && !StringUtil.isEmpty(str)) { - list.add(str); - } - } - - private void copyList(List<String> fromList, List<String> toList) { - if(fromList != null && toList != null) { - for(String str : fromList) { - addToList(str, toList); - } - } - } - - public String toJson() { - try { - ObjectMapper om = new ObjectMapper(); - - return om.writeValueAsString(this); - } catch (JsonGenerationException e) { - e.printStackTrace(); - } catch (JsonMappingException e) { - e.printStackTrace(); - } catch (IOException e) { - e.printStackTrace(); - } - - return ""; - } - - @Override - public String toString() { - return toJson(); - } - } - - public static void main(String[] args) { - GrantRevokeData grData = new GrantRevokeData(); - - System.out.println(grData.toString()); - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java b/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java deleted file mode 100644 index 389b547..0000000 --- a/agents-common/src/main/java/com/xasecure/admin/client/datatype/RESTResponse.java +++ /dev/null @@ -1,208 +0,0 @@ -/** - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.xasecure.admin.client.datatype; - -import java.util.List; - -import org.apache.log4j.Logger; -import org.codehaus.jackson.annotate.JsonAutoDetect; -import org.codehaus.jackson.annotate.JsonIgnoreProperties; -import org.codehaus.jackson.annotate.JsonAutoDetect.Visibility; -import org.codehaus.jackson.map.ObjectMapper; -import org.codehaus.jackson.map.annotate.JsonSerialize; - -import com.sun.jersey.api.client.ClientResponse; -import com.xasecure.authorization.utils.StringUtil; - - -@JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) -@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) -@JsonIgnoreProperties(ignoreUnknown = true) -public class RESTResponse { - private static Logger LOG = Logger.getLogger(RESTResponse.class); - - private int httpStatusCode; - private int statusCode; - private String msgDesc; - private List<Message> messageList; - - - public int getHttpStatusCode() { - return httpStatusCode; - } - - public void setHttpStatusCode(int httpStatusCode) { - this.httpStatusCode = httpStatusCode; - } - - public int getStatusCode() { - return statusCode; - } - - public void setStatusCode(int statusCode) { - this.statusCode = statusCode; - } - - public String getMsgDesc() { - return msgDesc; - } - - public void setMsgDesc(String msgDesc) { - this.msgDesc = msgDesc; - } - - public List<Message> getMessageList() { - return messageList; - } - - public void setMessageList(List<Message> messageList) { - this.messageList = messageList; - } - - public String getMessage() { - return StringUtil.isEmpty(msgDesc) ? ("HTTP " + httpStatusCode) : msgDesc; - } - - public static RESTResponse fromClientResponse(ClientResponse response) { - RESTResponse ret = null; - - String jsonString = response == null ? null : response.getEntity(String.class); - int httpStatus = response == null ? 0 : response.getStatus(); - - if(! StringUtil.isEmpty(jsonString)) { - ret = RESTResponse.fromJson(jsonString); - } - - if(ret == null) { - ret = new RESTResponse(); - } - - ret.setHttpStatusCode(httpStatus); - - return ret; - } - - public String toJson() { - try { - ObjectMapper om = new ObjectMapper(); - - return om.writeValueAsString(this); - } catch (Exception e) { - if(LOG.isDebugEnabled()) { - LOG.debug("toJson() failed", e); - } - } - - return ""; - } - - public static RESTResponse fromJson(String jsonString) { - try { - ObjectMapper om = new ObjectMapper(); - - return om.readValue(jsonString, RESTResponse.class); - } catch (Exception e) { - if(LOG.isDebugEnabled()) { - LOG.debug("fromJson('" + jsonString + "') failed", e); - } - } - - return null; - } - - @Override - public String toString() { - return toJson(); - } - - @JsonAutoDetect(getterVisibility = Visibility.NONE, setterVisibility = Visibility.NONE, fieldVisibility = Visibility.ANY) - @JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) - @JsonIgnoreProperties(ignoreUnknown = true) - public static class Message { - private String name; - private String rbKey; - private String message; - private Long objectId; - private String fieldName; - - public String getName() { - return name; - } - public void setName(String name) { - this.name = name; - } - public String getRbKey() { - return rbKey; - } - public void setRbKey(String rbKey) { - this.rbKey = rbKey; - } - public String getMessage() { - return message; - } - public void setMessage(String message) { - this.message = message; - } - public Long getObjectId() { - return objectId; - } - public void setObjectId(Long objectId) { - this.objectId = objectId; - } - public String getFieldName() { - return fieldName; - } - public void setFieldName(String fieldName) { - this.fieldName = fieldName; - } - - public String toJson() { - try { - ObjectMapper om = new ObjectMapper(); - - return om.writeValueAsString(this); - } catch (Exception e) { - if(LOG.isDebugEnabled()) { - LOG.debug("toJson() failed", e); - } - } - - return ""; - } - - public static RESTResponse fromJson(String jsonString) { - try { - ObjectMapper om = new ObjectMapper(); - - return om.readValue(jsonString, RESTResponse.class); - } catch (Exception e) { - if(LOG.isDebugEnabled()) { - LOG.debug("fromJson('" + jsonString + "') failed", e); - } - } - - return null; - } - - @Override - public String toString() { - return toJson(); - } - } -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java deleted file mode 100644 index f32c5b7..0000000 --- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/config/XaSecureConfiguration.java +++ /dev/null @@ -1,165 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - - -package com.xasecure.authorization.hadoop.config; - -import java.io.File; -import java.net.MalformedURLException; -import java.net.URL; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.log4j.Logger; - -import com.xasecure.audit.provider.AuditProviderFactory; -import com.xasecure.authorization.hadoop.constants.XaSecureHadoopConstants; - -public class XaSecureConfiguration extends Configuration { - - private static final Logger LOG = Logger.getLogger(XaSecureConfiguration.class) ; - - private static XaSecureConfiguration config = null; - - private XaSecureConfiguration() { - super(false) ; - - // - // WorkAround for having all Hadoop Configuration in the CLASSPATH first, even if it is invoked by Hive Engine. - // - // So, we look for "hive-site.xml", if it is available, take the xasecure-audit.xml file from the same location. - // If we do not see "hive-site.xml", we look for "hbase-site.xml", if found, take the xasecure-audit.xml file from the same location. - // If we do not see "hbase-site.xml", we look for "hdfs-site.xml", if found, take the xasecure-audit.xml file from the same location. - // If we do not see, we let the CLASSPATH based search to find xasecure-audit.xml file. - - - URL auditFileLocation = getXAAuditXMLFileLocation() ; - - if (auditFileLocation != null) { - addResource(auditFileLocation) ; - } - else { - addResourceIfReadable(XaSecureHadoopConstants.XASECURE_AUDIT_FILE) ; - } - addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HDFS_SECURITY_FILE); - addResourceIfReadable(XaSecureHadoopConstants.XASECURE_KNOX_SECURITY_FILE); - addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HBASE_SECURITY_FILE) ; - addResourceIfReadable(XaSecureHadoopConstants.XASECURE_HIVE_SECURITY_FILE) ; - addResourceIfReadable(XaSecureHadoopConstants.XASECURE_STORM_SECURITY_FILE); - - } - - @SuppressWarnings("deprecation") - private void addResourceIfReadable(String aResourceName) { - String fName = getFileLocation(aResourceName) ; - if (fName != null) { - File f = new File(fName) ; - if (f.exists() && f.canRead()) { - URL fUrl = null ; - try { - fUrl = f.toURL() ; - addResource(fUrl) ; - } catch (MalformedURLException e) { - LOG.debug("Unable to find URL for the resource name [" + aResourceName +"]. Ignoring the resource:" + aResourceName); - } - } - } - } - - - public static XaSecureConfiguration getInstance() { - if (config == null) { - synchronized (XaSecureConfiguration.class) { - XaSecureConfiguration temp = config; - if (temp == null) { - config = new XaSecureConfiguration(); - } - } - } - return config; - } - - public void initAudit(AuditProviderFactory.ApplicationType appType) { - AuditProviderFactory auditFactory = AuditProviderFactory.getInstance(); - - if(auditFactory == null) { - LOG.error("Unable to find the AuditProviderFactory. (null) found") ; - return; - } - - Properties props = getProps(); - - if(props == null) { - return; - } - - if(! auditFactory.isInitDone()) { - auditFactory.init(props, appType); - } - } - - public boolean isAuditInitDone() { - AuditProviderFactory auditFactory = AuditProviderFactory.getInstance(); - - return auditFactory != null && auditFactory.isInitDone(); - } - - - @SuppressWarnings("deprecation") - public URL getXAAuditXMLFileLocation() { - URL ret = null ; - - try { - for(String cfgFile : new String[] { "hive-site.xml", "hbase-site.xml", "hdfs-site.xml" } ) { - String loc = getFileLocation(cfgFile) ; - if (loc != null) { - if (new File(loc).canRead()) { - File parentFile = new File(loc).getParentFile() ; - ret = new File(parentFile, XaSecureHadoopConstants.XASECURE_AUDIT_FILE).toURL() ; - break ; - } - } - } - } - catch(Throwable t) { - LOG.error("Unable to locate audit file location." , t) ; - ret = null ; - } - - return ret ; - } - - private String getFileLocation(String fileName) { - - String ret = null ; - - URL lurl = XaSecureConfiguration.class.getClassLoader().getResource(fileName) ; - - if (lurl == null ) { - lurl = XaSecureConfiguration.class.getClassLoader().getResource("/" + fileName) ; - } - - if (lurl != null) { - ret = lurl.getFile() ; - } - - return ret ; - } - -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java deleted file mode 100644 index 64794d1..0000000 --- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/constants/XaSecureHadoopConstants.java +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.xasecure.authorization.hadoop.constants; - -public class XaSecureHadoopConstants { - - public static final String XASECURE_AUDIT_FILE = "xasecure-audit.xml" ; - public static final String XASECURE_HDFS_SECURITY_FILE = "xasecure-hdfs-security.xml" ; - public static final String XASECURE_KNOX_SECURITY_FILE = "xasecure-knox-security.xml" ; - public static final String XASECURE_HBASE_SECURITY_FILE = "xasecure-hbase-security.xml" ; - public static final String XASECURE_HIVE_SECURITY_FILE = "xasecure-hive-security.xml" ; - public static final String XASECURE_POLICYMGR_SSL_FILE = "xasecure-policymgr-ssl.xml" ; - public static final String XASECURE_STORM_SECURITY_FILE = "xasecure-storm-security.xml" ; - - public static final String XASECURE_ADD_HDFS_PERMISSION_PROP = "xasecure.add-hadoop-authorization" ; - public static final boolean XASECURE_ADD_HDFS_PERMISSION_DEFAULT = false ; - public static final String READ_ACCCESS_TYPE = "read"; - public static final String WRITE_ACCCESS_TYPE = "write"; - public static final String EXECUTE_ACCCESS_TYPE = "execute"; - - public static final String HDFS_ROOT_FOLDER_PATH_ALT = ""; - public static final String HDFS_ROOT_FOLDER_PATH = "/"; - - public static final String HDFS_ACCESS_VERIFIER_CLASS_NAME_PROP = "hdfs.authorization.verifier.classname" ; - public static final String HDFS_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hdfs.XASecureAuthorizer" ; - - public static final String HIVE_ACCESS_VERIFIER_CLASS_NAME_PROP = "hive.authorization.verifier.classname" ; - public static final String HIVE_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hive.XASecureAuthorizer" ; - - public static final String HIVE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_PROP = "xasecure.hive.update.xapolicies.on.grant.revoke" ; - public static final boolean HIVE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE = true; - - public static final String HBASE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_PROP = "xasecure.hbase.update.xapolicies.on.grant.revoke" ; - public static final boolean HBASE_UPDATE_XAPOLICIES_ON_GRANT_REVOKE_DEFAULT_VALUE = true; - - public static final String KNOX_ACCESS_VERIFIER_CLASS_NAME_PROP = "knox.authorization.verifier.classname" ; - public static final String KNOX_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.knox.XASecureAuthorizer" ; - - public static final String HBASE_ACCESS_VERIFIER_CLASS_NAME_PROP = "hbase.authorization.verifier.classname" ; - public static final String HBASE_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.hbase.XASecureAuthorizer" ; - - public static final String STORM_ACCESS_VERIFIER_CLASS_NAME_PROP = "storm.authorization.verifier.classname" ; - public static final String STORM_ACCESS_VERIFIER_CLASS_NAME_DEFAULT_VALUE = "com.xasecure.pdp.storm.XASecureAuthorizer" ; - - // - // Loging constants - // - public static final String AUDITLOG_FIELD_DELIMITER_PROP = "xasecure.auditlog.fieldDelimiterString"; - public static final String AUDITLOG_XASECURE_MODULE_ACL_NAME_PROP = "xasecure.auditlog.xasecureAcl.name" ; - public static final String AUDITLOG_HADOOP_MODULE_ACL_NAME_PROP = "xasecure.auditlog.hadoopAcl.name" ; - - public static final String DEFAULT_LOG_FIELD_DELIMITOR = "|" ; - public static final String DEFAULT_XASECURE_MODULE_ACL_NAME = "xasecure-acl" ; - public static final String DEFAULT_HADOOP_MODULE_ACL_NAME = "hadoop-acl" ; - - - public static final String AUDITLOG_FIELDINFO_VISIBLE_PROP = "xasecure.auditlog.fieldInfoVisible" ; - public static final boolean DEFAULT_AUDITLOG_FIELDINFO_VISIBLE = false ; - - public static final String AUDITLOG_ACCESS_GRANTED_TEXT_PROP = "xasecure.auditlog.accessgranted.text" ; - public static final String AUDITLOG_ACCESS_DENIED_TEXT_PROP = "xasecure.auditlog.accessdenied.text" ; - - public static final String DEFAULT_ACCESS_GRANTED_TEXT = "granted" ; - public static final String DEFAULT_ACCESS_DENIED_TEXT = "denied" ; - - public static final String AUDITLOG_EMPTY_STRING = "" ; - - public static final String AUDITLOG_HDFS_EXCLUDE_LIST_PROP = "xasecure.auditlog.hdfs.excludeusers" ; - public static final String AUDITLOG_REPOSITORY_NAME_PROP = "xasecure.audit.repository.name" ; - public static final String AUDITLOG_IS_ENABLED_PROP = "xasecure.audit.is.enabled" ; - - public static final String KEYMGR_URL_PROP = "hdfs.keymanager.url" ; -} http://git-wip-us.apache.org/repos/asf/incubator-ranger/blob/413fcb68/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java ---------------------------------------------------------------------- diff --git a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java b/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java deleted file mode 100644 index 973897f..0000000 --- a/agents-common/src/main/java/com/xasecure/authorization/hadoop/log/HdfsFileAppender.java +++ /dev/null @@ -1,1376 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package com.xasecure.authorization.hadoop.log; - -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileNotFoundException; -import java.io.FileOutputStream; -import java.io.FilenameFilter; -import java.io.IOException; -import java.io.OutputStream; -import java.io.OutputStreamWriter; -import java.io.Writer; -import java.net.InetAddress; -import java.net.UnknownHostException; -import java.security.PrivilegedExceptionAction; -import java.text.SimpleDateFormat; -import java.util.Calendar; -import java.util.Date; -import java.util.GregorianCalendar; -import java.util.Locale; -import java.util.TimeZone; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FSDataOutputStream; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.FileUtil; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.log4j.FileAppender; -import org.apache.log4j.Layout; -import org.apache.log4j.spi.LoggingEvent; -import org.apache.log4j.helpers.LogLog; - - -/******************************** -* HdfsFileAppender -* -**********************************/ -public class HdfsFileAppender extends FileAppender { - - - // Constants for checking the DatePattern - public static final String MINUTES ="Min"; - public static final String HOURS ="Hr"; - public static final String DAYS ="Day"; - public static final String WEEKS ="Week"; - public static final String MONTHS ="Month"; - - // The code assumes that the following constants are in a increasing sequence. - public static final int TOP_OF_TROUBLE = -1; - public static final int TOP_OF_MINUTE = 0; - public static final int TOP_OF_HOUR = 1; - public static final int HALF_DAY = 2; - public static final int TOP_OF_DAY = 3; - public static final int TOP_OF_WEEK = 4; - public static final int TOP_OF_MONTH = 5; - - /** - * The date pattern. By default, the pattern is set to "1Day" meaning daily rollover. - */ - private String hdfsFileRollingInterval = "1Day"; - - private String fileRollingInterval = "1Day"; - - - private String scheduledFileCache; - - /** - * The next time we estimate a rollover should occur. - */ - private long nextCheck = System.currentTimeMillis() - 1; - - private long prevnextCheck = nextCheck; - - private long nextCheckLocal = System.currentTimeMillis() -1; - - private long prevnextCheckLocal = nextCheckLocal; - - private Date now = new Date(); - - private Date nowLocal = now; - - private SimpleDateFormat sdf; - - private RollingCalendar rc = new RollingCalendar(); - - private RollingCalendar rcLocal = new RollingCalendar(); - - private FileOutputStream ostream = null; - - private String fileSystemName; - - private Layout layout = null; - - private String encoding = null; - - private String hdfsfileName = null; - - private String actualHdfsfileName = null; - - private String scheduledHdfsFileName = null; - - private String fileCache = null; - - private HdfsSink hs = null; - - private Writer cacheWriter = null; - - private FileOutputStream cacheOstream = null; - - private boolean hdfsAvailable = false; - - private long hdfsNextCheck = System.currentTimeMillis() - 1; - - private boolean timeCheck = false; - - private int hdfsFileRollOffset = 0; - - private int fileRollOffset = 0; - - private boolean firstTime = true; - - private boolean firstTimeLocal = true; - - private String hdfsLiveUpdate = "true"; - - boolean hdfsUpdateAllowed = true; - - private String hdfsCheckInterval=null; - - private String processUser = null; - - private String datePattern = "'.'yyyy-MM-dd-HH-mm"; - - /** - * The gmtTimeZone is used only in computeCheckPeriod() method. - */ - private static final TimeZone gmtTimeZone = TimeZone.getTimeZone("GMT+0"); - - private static final String DEFAULT_HDFSCHECKINTERVAL = "2min"; - - /** - * The default constructor does nothing. - */ - public HdfsFileAppender() { - - } - - - /** - * The <b>hdfsFileRollingInterval</b> takes a string like 1min, 5min,... 1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for hdfs File rollover schedule. - */ - public void setHdfsFileRollingInterval(String pattern) { - hdfsFileRollingInterval = pattern; - } - - /** Returns the value of the <b>hdfsFileRollingInterval</b> option. */ - public String getHdfsFileRollingInterval() { - return hdfsFileRollingInterval; - } - - /** - * The <b>LocalDatePattern</b> takes a string like 1min, 5min,... 1hr, 2hrs,.. 1day, 2days... 1week, 2weeks.. 1month, 2months.. for local cache File rollover schedule. - */ - public void setFileRollingInterval(String pattern) { - fileRollingInterval = pattern; - } - - /** Returns the value of the <b>FileRollingInterval</b> option. */ - public String getFileRollingInterval() { - return fileRollingInterval; - } - - /** - * This will set liveHdfsUpdate flag , where true will update hdfs live else false will create local cache files and copy the files to hdfs - */ - public void setHdfsLiveUpdate(String val) { - hdfsLiveUpdate=val; - } - - /** Returns the value of the <b>FileRollingInterval</b> option. */ - public String getHdfsLiveUpdate() { - return hdfsLiveUpdate; - } - - public String getHdfsCheckInterval() { - return hdfsCheckInterval; - } - - public void setHdfsCheckInterval(String val){ - hdfsCheckInterval = ( hdfsCheckInterval != null) ? val : DEFAULT_HDFSCHECKINTERVAL; - } - - public String getEncoding() { - return encoding; - } - - public void setEncoding(String value) { - encoding = value; - } - - public void activateOptions() { - super.activateOptions(); - - sdf = new SimpleDateFormat(datePattern); - processUser=System.getProperties().getProperty("user.name"); - - if(hdfsFileRollingInterval != null && fileName != null) { - now.setTime(System.currentTimeMillis()); - int type = computeCheckPeriod(hdfsFileRollingInterval); - hdfsFileRollOffset = getTimeOffset(hdfsFileRollingInterval); - printHdfsPeriodicity(type,hdfsFileRollOffset); - rc.setType(type); - LogLog.debug("File name: " + fileName); - File file = new File(fileName); - scheduledHdfsFileName = hdfsfileName+sdf.format(new Date(file.lastModified())); - firstTime = true; - LogLog.debug("Local and hdfs Files" + scheduledHdfsFileName + " " +scheduledHdfsFileName) ; - - } else { - LogLog.error("Either File or hdfsFileRollingInterval options are not set for appender [" + name + "]."); - } - - // Local Cache File - - if (fileRollingInterval != null && fileCache != null){ - nowLocal.setTime(System.currentTimeMillis()); - int localtype = computeCheckPeriod(fileRollingInterval); - fileRollOffset = getTimeOffset(fileRollingInterval); - printLocalPeriodicity(localtype,fileRollOffset); - rcLocal.setType(localtype); - LogLog.debug("LocalCacheFile name: " + fileCache); - File fileCachehandle = new File(fileCache); - scheduledFileCache = fileCache+sdf.format(new Date(fileCachehandle.lastModified())); - firstTimeLocal = true; - - } else { - LogLog.error("Either File or LocalDatePattern options are not set for appender [" + name + "]."); - } - - hdfsUpdateAllowed = Boolean.parseBoolean(hdfsLiveUpdate); - actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis()); - } - - public static int containsIgnoreCase(String str1, String str2) { - return str1.toLowerCase().indexOf(str2.toLowerCase()); - } - - - public int computeCheckPeriod(String timePattern){ - - if(containsIgnoreCase(timePattern, MINUTES) > 0) { - return TOP_OF_MINUTE; - } - - if(containsIgnoreCase(timePattern, HOURS) > 0) { - return TOP_OF_HOUR; - } - - if(containsIgnoreCase(timePattern, DAYS) > 0) { - return TOP_OF_DAY; - } - - if(containsIgnoreCase(timePattern, WEEKS) > 0) { - return TOP_OF_WEEK; - } - - if(containsIgnoreCase(timePattern, MONTHS) > 0) { - return TOP_OF_MONTH; - } - - return TOP_OF_TROUBLE; - } - - - private void printHdfsPeriodicity(int type, int offset) { - switch(type) { - case TOP_OF_MINUTE: - LogLog.debug("Appender [" + name + "] to be rolled every " + offset + " minute."); - break; - case TOP_OF_HOUR: - LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset + " hour."); - break; - case HALF_DAY: - LogLog.debug("Appender [" + name + "] to be rolled at midday and midnight."); - break; - case TOP_OF_DAY: - LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset + " day."); - break; - case TOP_OF_WEEK: - LogLog.debug("Appender [" + name + "] to be rolled on top of every " + offset + " week."); - break; - case TOP_OF_MONTH: - LogLog.debug("Appender [" + name + "] to be rolled at start of every " + offset + " month."); - break; - default: - LogLog.warn("Unknown periodicity for appender [" + name + "]."); - } - } - - - public int getTimeOffset(String timePattern){ - int index; - int offset=-1; - - if ((index = containsIgnoreCase(timePattern, MINUTES)) > 0) { - offset = Integer.parseInt(timePattern.substring(0,index)); - } - - if ((index = containsIgnoreCase(timePattern, HOURS)) > 0) { - offset = Integer.parseInt(timePattern.substring(0,index)); - - } - - if ((index = containsIgnoreCase(timePattern, DAYS)) > 0) { - offset = Integer.parseInt(timePattern.substring(0,index)); - - } - - if ((index = containsIgnoreCase(timePattern, WEEKS)) > 0) { - offset = Integer.parseInt(timePattern.substring(0,index)); - - } - - if ((index = containsIgnoreCase(timePattern, MONTHS)) > 0) { - offset = Integer.parseInt(timePattern.substring(0,index)); - - } - - return offset; - } - - private void printLocalPeriodicity(int type, int offset) { - switch(type) { - case TOP_OF_MINUTE: - LogLog.debug("Appender [" + name + "] Local File to be rolled every " + offset + " minute."); - break; - case TOP_OF_HOUR: - LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset + " hour."); - break; - case HALF_DAY: - LogLog.debug("Appender [" + name + "] Local File to be rolled at midday and midnight."); - break; - case TOP_OF_DAY: - LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset + " day."); - break; - case TOP_OF_WEEK: - LogLog.debug("Appender [" + name + "] Local File to be rolled on top of every " + offset + " week."); - break; - case TOP_OF_MONTH: - LogLog.debug("Appender [" + name + "] Local File to be rolled at start of every " + offset + " month."); - break; - default: - LogLog.warn("Unknown periodicity for appender [" + name + "]."); - } - } - - - - - /** - * Rollover the current file to a new file. - */ - private void rollOver() throws IOException { - /* Compute filename, but only if hdfsFileRollingInterval is specified */ - if(hdfsFileRollingInterval == null) { - errorHandler.error("Missing hdfsFileRollingInterval option in rollOver()."); - return; - } - - long epochNow = System.currentTimeMillis(); - - String datedhdfsFileName = hdfsfileName+sdf.format(epochNow); - - LogLog.debug("In rollOver epochNow" + epochNow + " " + "nextCheck: " + prevnextCheck ); - - - // It is too early to roll over because we are still within the bounds of the current interval. Rollover will occur once the next interval is reached. - - if (epochNow < prevnextCheck) { - return; - } - - // close current file, and rename it to datedFilename - this.closeFile(); - - LogLog.debug("Rolling Over hdfs file to " + scheduledHdfsFileName); - - - if ( hdfsAvailable ) { - // for hdfs file we don't rollover the fike, we rename the file. - actualHdfsfileName = hdfsfileName + sdf.format(System.currentTimeMillis()); - } - - try { - // This will also close the file. This is OK since multiple close operations are safe. - this.setFile(fileName, false, this.bufferedIO, this.bufferSize); - } catch(IOException e) { - errorHandler.error("setFile(" + fileName + ", false) call failed."); - } - scheduledHdfsFileName = datedhdfsFileName; - } - - - /** - * Rollover the current Local file to a new file. - */ - private void rollOverLocal() throws IOException { - /* Compute filename, but only if datePattern is specified */ - if(fileRollingInterval == null) { - errorHandler.error("Missing LocalDatePattern option in rollOverLocal()."); - return; - } - - long epochNow = System.currentTimeMillis(); - - String datedCacheFileName = fileCache+sdf.format(epochNow); - LogLog.debug("In rollOverLocal() epochNow" + epochNow + " " + "nextCheckLocal: " + prevnextCheckLocal ); - - // It is too early to roll over because we are still within the bounds of the current interval. Rollover will occur once the next interval is reached. - if (epochNow < prevnextCheckLocal ) { - return; - } - - if (new File(fileCache).length() != 0 ) { - LogLog.debug("Rolling Local cache to " + scheduledFileCache); - - this.closeCacheWriter(); - - File target = new File(scheduledFileCache); - if (target.exists()) { - target.delete(); - } - - File file = new File(fileCache); - - boolean result = file.renameTo(target); - - if(result) { - LogLog.debug(fileCache +" -> "+ scheduledFileCache); - } else { - LogLog.error("Failed to rename cache file ["+fileCache+"] to ["+scheduledFileCache+"]."); - } - setFileCacheWriter(); - scheduledFileCache = datedCacheFileName; - } - } - - - /** - * <p> - * Sets and <i>opens</i> the file where the log output will go. The specified file must be writable. - * <p> - * If there was already an opened file, then the previous file is closed first. - * <p> - * <b>Do not use this method directly. To configure a FileAppender or one of its subclasses, set its properties one by one and then call - * activateOptions.</b> - * - * @param fileName The path to the log file. - * @param append If true will append to fileName. Otherwise will truncate fileName. - */ - public void setFile(String file) { - // Trim spaces from both ends. The users probably does not want - // trailing spaces in file names. - String val = file.trim(); - - fileName=val; - fileCache=val+".cache"; - - } - - @Override - public synchronized void setFile(String fileName, boolean append, boolean bufferedIO, int bufferSize) throws IOException { - LogLog.debug("setFile called: "+fileName+", "+append); - - // It does not make sense to have immediate flush and bufferedIO. - if(bufferedIO) { - setImmediateFlush(false); - } - - reset(); - - try { - // - // attempt to create file - // - ostream = new FileOutputStream(fileName, append); - } catch(FileNotFoundException ex) { - // - // if parent directory does not exist then - // attempt to create it and try to create file - // see bug 9150 - // - File umFile = new File(fileName); - String parentName = umFile.getParent(); - - if (parentName != null) { - File parentDir = new File(parentName); - if(!parentDir.exists() && parentDir.mkdirs()) { - ostream = new FileOutputStream(fileName, append); - } else { - throw ex; - } - } else { - throw ex; - } - } - - Writer fw = createWriter(ostream); - if(bufferedIO) { - fw = new BufferedWriter(fw, bufferSize); - } - this.setQWForFiles(fw); - this.fileName = fileName; - this.fileAppend = append; - this.bufferedIO = bufferedIO; - this.bufferSize = bufferSize; - - //set cache file - setFileCacheWriter(); - - writeHeader(); - - LogLog.debug("setFile ended"); - } - - public void setHdfsDestination(final String name) { - //Setting the fileSystemname - - String hostName = null; - - String val = name.trim(); - - try { - - hostName = InetAddress.getLocalHost().getHostName(); - val=val.replaceAll("%hostname%", hostName); - String hostStr[] = val.split(":"); - if ( hostStr.length > 0 ) { - fileSystemName = hostStr[0]+":"+hostStr[1]+":"+hostStr[2]; - - hdfsfileName = hostStr[3]; - - } else { - LogLog.error("Failed to set HdfsSystem and File"); - } - - } catch (UnknownHostException uhe) { - LogLog.error("Setting the Hdfs Desitination Failed", uhe); - } - - LogLog.debug("FileSystemName:" + fileSystemName + "fileName:"+ hdfsfileName); - - } - - /** - * This method differentiates HdfsFileAppender from its super class. - * <p> - * Before actually logging, this method will check whether it is time to do a rollover. If it is, it will schedule the next rollover time and then rollover. - */ - @Override - protected void subAppend(LoggingEvent event) { - LogLog.debug("Called subAppend for logging into hdfs..."); - - long n = System.currentTimeMillis(); - if(n >= nextCheck) { - now.setTime(n); - prevnextCheck = nextCheck; - nextCheck = rc.getNextCheckMillis(now,hdfsFileRollOffset); - if ( firstTime) { - prevnextCheck = nextCheck; - firstTime = false; - } - try { - if (hdfsUpdateAllowed) { - rollOver(); - } - } catch(IOException e) { - LogLog.error("rollOver() failed.", e); - } - } - - long nLocal = System.currentTimeMillis(); - if ( nLocal > nextCheckLocal ) { - nowLocal.setTime(nLocal); - prevnextCheckLocal = nextCheckLocal; - nextCheckLocal = rcLocal.getNextCheckMillis(nowLocal, fileRollOffset); - if ( firstTimeLocal) { - prevnextCheckLocal = nextCheckLocal; - firstTimeLocal = false; - } - try { - rollOverLocal(); - } catch(IOException e) { - LogLog.error("rollOverLocal() failed.", e); - } - } - - this.layout = this.getLayout(); - this.encoding = this.getEncoding(); - - // Append HDFS - appendHDFSFileSystem(event); - - - //super.subAppend(event); - } - - @Override - protected - void reset() { - closeWriter(); - this.qw = null; - //this. - this.closeHdfsWriter(); - this.closeCacheWriter(); - } - - @Override - public synchronized void close() { - LogLog.debug("Closing all resource.."); - this.closeFile(); - this.closeHdfsWriter(); - this.closeHdfsOstream(); - this.closeFileSystem(); - } - - @Override - protected void closeFile() { - try { - if(this.ostream != null) { - this.ostream.close(); - this.ostream = null; - } - } catch(IOException ie) { - LogLog.error("unable to close output stream", ie); - } - this.closeHdfsWriter(); - this.closeHdfsOstream(); - } - - @Override - protected void closeWriter() { - try { - if(this.qw != null) { - this.qw.close(); - this.qw = null; - } - } catch(IOException ie) { - LogLog.error("unable to close writer", ie); - } - } - - @Override - public void finalize() { - super.finalize(); - close(); - } - - - /******* HDFS Appender methods ***********/ - - private void appendHDFSFileSystem(LoggingEvent event) { - - long currentTime = System.currentTimeMillis(); - - try { - - if ( currentTime >= hdfsNextCheck ) { - - LogLog.debug("About to Open fileSystem" + fileSystemName+" "+actualHdfsfileName) ; - hs = openHdfsSink(fileSystemName,actualHdfsfileName,fileCache,fileAppend,bufferedIO,bufferSize,layout,encoding,scheduledFileCache,cacheWriter,hdfsUpdateAllowed,processUser); - if (hdfsUpdateAllowed) { - // stream into hdfs only when liveHdfsUpdate flag is true else write to cache file. - hs.setOsteam(); - hs.setWriter(); - hs.append(event); - } else { - writeToCache(event); - } - hdfsAvailable = true; - - } else { - // Write the Log To cache file util time to check hdfs availability - hdfsAvailable = false; - LogLog.debug("Hdfs Down..Will check hdfs vailability after " + hdfsNextCheck + "Current Time :" +hdfsNextCheck ) ; - writeToCache(event); - } - } - catch(Throwable t) { - // Write the Log To cache file if hdfs connect error out. - hdfsAvailable = false; - if ( !timeCheck ) { - int hdfscheckInterval = getTimeOffset(hdfsCheckInterval); - hdfsNextCheck = System.currentTimeMillis()+(1000*60*hdfscheckInterval); - timeCheck = true; - LogLog.debug("Hdfs Down..Will check hdfs vailability after " + hdfsCheckInterval , t) ; - - } - writeToCache(event); - } - - } - - - private HdfsSink openHdfsSink(String fileSystemName,String filename, String fileCache, boolean append, boolean bufferedIO,int bufferSize,Layout layout, String encoding, String scheduledCacheFile, Writer cacheWriter,boolean hdfsUpdateAllowed,String processUser) throws Throwable { - - HdfsSink hs = null; - hs = HdfsSink.getInstance(); - if ( hs != null) - - LogLog.debug("Hdfs Sink successfully instatiated"); - try { - hs.init(fileSystemName, filename, fileCache, append, bufferedIO, bufferSize, layout, encoding,scheduledCacheFile,cacheWriter,hdfsUpdateAllowed,processUser); - - } catch (Throwable t) { - throw t; - } - return hs; - - } - - private void closeHdfsOstream() { - if (hs != null ){ - LogLog.debug("Closing hdfs outstream") ; - hs.closeHdfsOstream(); - } - } - - private void closeHdfsWriter() { - - if (hs != null) { - LogLog.debug("Closing hdfs Writer") ; - hs.closeHdfsWriter(); - } - } - - private void closeFileSystem() { - hs.closeHdfsSink(); - } - - - - /****** Cache File Methods **/ - - - public void setFileCacheWriter() { - - try { - setFileCacheOstream(fileCache); - } catch(IOException ie) { - LogLog.error("Logging failed while tring to write into Cache File..", ie); - } - LogLog.debug("Setting Cache Writer.."); - cacheWriter = createCacheFileWriter(cacheOstream); - if(bufferedIO) { - cacheWriter = new BufferedWriter(cacheWriter, bufferSize); - } - } - - - private void setFileCacheOstream(String fileCache) throws IOException { - - try { - cacheOstream = new FileOutputStream(fileCache, true); - } catch(FileNotFoundException ex) { - String parentName = new File(fileCache).getParent(); - if (parentName != null) { - File parentDir = new File(parentName); - if(!parentDir.exists() && parentDir.mkdirs()) { - cacheOstream = new FileOutputStream(fileName, true); - } else { - throw ex; - } - } else { - throw ex; - } - } - } - - - public OutputStreamWriter createCacheFileWriter(OutputStream os ) { - OutputStreamWriter retval = null; - - if(encoding != null) { - try { - retval = new OutputStreamWriter(os, encoding); - } catch(IOException ie) { - LogLog.warn("Error initializing output writer."); - LogLog.warn("Unsupported encoding?"); - } - } - if(retval == null) { - retval = new OutputStreamWriter(os); - } - return retval; - } - - - public void writeToCache(LoggingEvent event) { - - try { - LogLog.debug("Writing log to Cache.." + "layout: "+ this.layout.format(event) + "ignoresThowable: "+layout.ignoresThrowable() + "Writer:" + cacheWriter.toString()); - - cacheWriter.write(this.layout.format(event)); - cacheWriter.flush(); - - if(layout.ignoresThrowable()) { - String[] s = event.getThrowableStrRep(); - if (s != null) { - int len = s.length; - for(int i = 0; i < len; i++) { - LogLog.debug("Log:" + s[i]); - cacheWriter.write(s[i]); - cacheWriter.write(Layout.LINE_SEP); - cacheWriter.flush(); - } - } - } - } catch (IOException ie) { - LogLog.error("Unable to log event message to hdfs:", ie); - } - } - - public void rollOverCacheFile() { - - if (new File(fileCache).length() != 0 ) { - - long epochNow = System.currentTimeMillis(); - - String datedCacheFileName = fileCache + "." + epochNow; - LogLog.debug("Rolling over remaining cache File to new file"+ datedCacheFileName); - closeCacheWriter(); - - File target = new File(datedCacheFileName); - if (target.exists()) { - target.delete(); - } - - File file = new File(fileCache); - - boolean result = file.renameTo(target); - - if(result) { - LogLog.debug(fileCache +" -> "+ datedCacheFileName); - } else { - LogLog.error("Failed to rename cache file ["+fileCache+"] to ["+datedCacheFileName+"]."); - } - } - } - - public void closeCacheWriter() { - try { - if(cacheWriter != null) { - cacheWriter.close(); - cacheWriter = null; - } - } catch(IOException ie) { - LogLog.error("unable to close cache writer", ie); - } - } -} - -/** - * RollingCalendar is a helper class to HdfsFileAppender. Given a periodicity type and the current time, it computes the start of the next interval. - */ - -class RollingCalendar extends GregorianCalendar { - private static final long serialVersionUID = 1L; - - private int type = HdfsFileAppender.TOP_OF_TROUBLE; - - RollingCalendar() { - super(); - } - - RollingCalendar(TimeZone tz, Locale locale) { - super(tz, locale); - } - - void setType(int type) { - this.type = type; - } - - public long getNextCheckMillis(Date now, int offset) { - return getNextCheckDate(now,offset).getTime(); - } - - public Date getNextCheckDate(Date now,int offset) { - this.setTime(now); - - switch(this.type) { - case HdfsFileAppender.TOP_OF_MINUTE: - this.set(Calendar.SECOND, 0); - this.set(Calendar.MILLISECOND, 0); - this.add(Calendar.MINUTE, offset); - break; - case HdfsFileAppender.TOP_OF_HOUR: - this.set(Calendar.MINUTE, 0); - this.set(Calendar.SECOND, 0); - this.set(Calendar.MILLISECOND, 0); - this.add(Calendar.HOUR_OF_DAY, offset); - break; - case HdfsFileAppender.HALF_DAY: - this.set(Calendar.MINUTE, 0); - this.set(Calendar.SECOND, 0); - this.set(Calendar.MILLISECOND, 0); - int hour = get(Calendar.HOUR_OF_DAY); - if(hour < 12) { - this.set(Calendar.HOUR_OF_DAY, 12); - } else { - this.set(Calendar.HOUR_OF_DAY, 0); - this.add(Calendar.DAY_OF_MONTH, 1); - } - break; - case HdfsFileAppender.TOP_OF_DAY: - this.set(Calendar.HOUR_OF_DAY, 0); - this.set(Calendar.MINUTE, 0); - this.set(Calendar.SECOND, 0); - this.set(Calendar.MILLISECOND, 0); - this.add(Calendar.DATE, offset); - break; - case HdfsFileAppender.TOP_OF_WEEK: - this.set(Calendar.DAY_OF_WEEK, getFirstDayOfWeek()); - this.set(Calendar.HOUR_OF_DAY, 0); - this.set(Calendar.SECOND, 0); - this.set(Calendar.MILLISECOND, 0); - this.add(Calendar.WEEK_OF_YEAR, offset); - break; - case HdfsFileAppender.TOP_OF_MONTH: - this.set(Calendar.DATE, 1); - this.set(Calendar.HOUR_OF_DAY, 0); - this.set(Calendar.SECOND, 0); - this.set(Calendar.MILLISECOND, 0); - this.add(Calendar.MONTH, offset); - break; - default: - throw new IllegalStateException("Unknown periodicity type."); - } - return getTime(); - } - - -} - - -/************* - * Hdfs Sink - * - *************/ - -class HdfsSink { - - private static final String DS_REPLICATION_VAL = "1"; - private static final String DS_REPLICATION_KEY = "dfs.replication"; - private static final String FS_DEFAULT_NAME_KEY = "fs.default.name"; - private Configuration conf = null; - private FileSystem fs= null; - private Path pt = null; - private FSDataOutputStream hdfsostream = null; - private String fsName = null; - private String fileName = null; - private String fileCache = null; - private Layout layout = null; - private String encoding = null; - private Writer hdfswriter = null; - private int bufferSize; - private boolean bufferedIO=false; - private static int fstime=0; - private CacheFileWatcher cfw = null; - private boolean hdfsUpdateAllowed=true; - private String processUser=null; - - - HdfsSink() { - } - - private static final ThreadLocal<HdfsSink> hdfssink = new ThreadLocal<HdfsSink>() { - protected HdfsSink initialValue() { - return new HdfsSink(); - } - }; - - public static HdfsSink getInstance() { - return hdfssink.get(); - } - - public void init(String fileSystemName, String fileName, String fileCache,boolean append, boolean bufferedIO, int bufferSize, Layout layout, String encoding, String scheduledCacheFile, Writer cacheWriter, boolean hdfsUpdateAllowed, String processUser) throws Exception{ - - this.fsName=fileSystemName; - this.fileName=fileName; - this.layout=layout; - this.encoding=encoding; - this.bufferSize=bufferSize; - this.bufferedIO=bufferedIO; - this.fileCache=fileCache; - this.hdfsUpdateAllowed=hdfsUpdateAllowed; - this.processUser=processUser; - - final Configuration conf= new Configuration(); - conf.set(DS_REPLICATION_KEY,DS_REPLICATION_VAL); - conf.set(FS_DEFAULT_NAME_KEY, fsName); - - try { - if ( fs == null) { - LogLog.debug("Opening Connection to hdfs Sytem" + this.fsName); - - UserGroupInformation ugi = UserGroupInformation.createProxyUser(this.processUser, UserGroupInformation.getLoginUser()); - fs = ugi.doAs( new PrivilegedExceptionAction<FileSystem>() { - public FileSystem run() throws Exception { - FileSystem filesystem = FileSystem.get(conf); - LogLog.debug("Inside UGI.." + fsName + " " + filesystem); - return filesystem; - } - }); - - if ( cfw == null) { - // Start the CacheFileWatcher to move the Cache file. - - LogLog.debug("About to run CacheFilWatcher..."); - Path hdfsfilePath = getParent(); - cfw = new CacheFileWatcher(this.fs,this.fileCache,hdfsfilePath,cacheWriter,this.hdfsUpdateAllowed,conf); - cfw.start(); - } - - } - - } catch(Exception ie) { - - LogLog.error("Unable to Create hdfs logfile:" + ie.getMessage()); - throw ie; - } - - LogLog.debug("HdfsSystem up: " + fsName + "FS Object:" + fs); - } - - public int getfstime() { - return fstime; - } - public FileSystem getFileSystem() { - return fs; - } - - public Path getPath() { - return pt; - } - - public Path getParent() { - Path pt = new Path(this.fileName); - return pt.getParent(); - } - - public void setOsteam() throws IOException { - try { - pt = new Path(this.fileName); - // if file Exist append it - if(fs.exists(pt)) { - LogLog.debug("Appending File: "+ this.fsName+":"+this.fileName+fs); - if (hdfsostream !=null) { - hdfsostream.close(); - } - hdfsostream=fs.append(pt); - - } else { - LogLog.debug("Creating File directories in hdfs if not present.."+ this.fsName+":"+this.fileName + fs); - String parentName = new Path(this.fileName).getParent().toString(); - if(parentName != null) { - Path parentDir = new Path(parentName); - if (!fs.exists(parentDir) ) { - LogLog.debug("Creating Parent Directory: " + parentDir ); - fs.mkdirs(parentDir); - } - } - hdfsostream = fs.create(pt); - } - } catch (IOException ie) { - LogLog.debug("Error While appending hdfsd file." + ie); - throw ie; - } - } - - public void setWriter() { - LogLog.debug("Setting Writer.."); - hdfswriter = createhdfsWriter(hdfsostream); - if(bufferedIO) { - hdfswriter = new BufferedWriter(hdfswriter, bufferSize); - } - } - - public Writer getWriter() { - return hdfswriter; - } - - public void append(LoggingEvent event) throws IOException { - try { - LogLog.debug("Writing log to HDFS." + "layout: "+ this.layout.format(event) + "ignoresThowable: "+layout.ignoresThrowable() + "Writer:" + hdfswriter.toString()); - - hdfswriter.write(this.layout.format(event)); - hdfswriter.flush(); - if(layout.ignoresThrowable()) { - String[] s = event.getThrowableStrRep(); - if (s != null) { - int len = s.length; - for(int i = 0; i < len; i++) { - LogLog.debug("Log:" + s[i]); - hdfswriter.write(s[i]); - hdfswriter.write(Layout.LINE_SEP); - hdfswriter.flush(); - } - } - } - } catch (IOException ie) { - LogLog.error("Unable to log event message to hdfs:", ie); - throw ie; - } - } - - public void writeHeader() throws IOException { - LogLog.debug("Writing log header..."); - try { - if(layout != null) { - String h = layout.getHeader(); - if(h != null && hdfswriter != null) - LogLog.debug("Log header:" + h); - hdfswriter.write(h); - hdfswriter.flush(); - } - } catch (IOException ie) { - LogLog.error("Unable to log header message to hdfs:", ie); - throw ie; - } - } - - public - void writeFooter() throws IOException{ - LogLog.debug("Writing footer header..."); - try { - if(layout != null) { - String f = layout.getFooter(); - if(f != null && hdfswriter != null) { - LogLog.debug("Log:" + f); - hdfswriter.write(f); - hdfswriter.flush(); - } - } - } catch (IOException ie) { - LogLog.debug("Unable to log header message to hdfs:", ie); - throw ie; - } - - } - - public void closeHdfsOstream() { - try { - if(this.hdfsostream != null) { - this.hdfsostream.close(); - this.hdfsostream = null; - } - } catch(IOException ie) { - LogLog.error("unable to close output stream", ie); - } - - } - - public void closeHdfsWriter() { - try { - if(hdfswriter != null) { - hdfswriter.close(); - hdfswriter = null; - } - } catch(IOException ie) { - LogLog.error("unable to hfds writer", ie); - } - } - - public void closeHdfsSink() { - try { - if (fs !=null) { - fs.close(); - } - } catch (IOException ie) { - LogLog.error("Unable to close hdfs " + fs ,ie); - } - } - - - public OutputStreamWriter createhdfsWriter(FSDataOutputStream os ) { - OutputStreamWriter retval = null; - - if(encoding != null) { - try { - retval = new OutputStreamWriter(os, encoding); - } catch(IOException ie) { - LogLog.warn("Error initializing output writer."); - LogLog.warn("Unsupported encoding?"); - } - } - if(retval == null) { - retval = new OutputStreamWriter(os); - } - return retval; - } - - - } - - -// CacheFileWatcher Thread - -class CacheFileWatcher extends Thread { - - long CACHEFILE_WATCHER_SLEEP_TIME = 1000*60*2; - - Configuration conf = null; - private FileSystem fs = null; - private String cacheFile = null; - private File parentDir = null; - private File[] files = null; - private Path fsPath = null; - private Path hdfsfilePath = null; - private Writer cacheWriter = null; - - private boolean hdfsUpdateAllowed=true; - private boolean cacheFilesCopied = false; - - CacheFileWatcher(FileSystem fs, String cacheFile, Path hdfsfilePath, Writer cacheWriter, boolean hdfsUpdateAllowed, Configuration conf) { - this.fs = fs; - this.cacheFile = cacheFile; - this.conf = conf; - this.hdfsfilePath = hdfsfilePath; - this.cacheWriter = cacheWriter; - this.hdfsUpdateAllowed = hdfsUpdateAllowed; - } - - - public void run(){ - - LogLog.debug("CacheFileWatcher Started"); - - while (!cacheFilesCopied ){ - - if (hdfsUpdateAllowed) { - rollRemainingCacheFile(); - } - - if ( !cacheFilePresent(cacheFile) ) { - - try { - Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME); - } catch (InterruptedException ie) { - LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie); - } - } else { - try { - copyCacheFilesToHdfs(); - if (hdfsUpdateAllowed) { - cacheFilesCopied = true; - } else { - cacheFilesCopied = false; - } - } catch (Throwable t) { - // Error While copying the file to hdfs and thread goes for sleep and check later - cacheFilesCopied = false; - LogLog. error("Error while copying Cache Files to hdfs..Sleeping for next try",t); - - try { - Thread.sleep(CACHEFILE_WATCHER_SLEEP_TIME); - } catch (InterruptedException ie) { - LogLog.error("Unable to complete the CatchFileWatcher Sleep", ie); - } - } - } - } - } - - public boolean cacheFilePresent(String filename) { - String parent = new File(filename).getParent(); - if ( parent != null ) { - parentDir = new File(parent); - fsPath = new Path(parent); - files = parentDir.listFiles(new FilenameFilter() { - @Override - public boolean accept(File parentDir, String name) { - return name.matches(".*cache.+"); - } - }); - if ( files.length > 0) { - LogLog.debug("CacheFile Present.."); - return true; - } - } - return false; - } - - - public void copyCacheFilesToHdfs() throws Throwable{ - - try { - - if (!fs.exists(hdfsfilePath) ) { - LogLog.debug("Creating Parent Directory: " + hdfsfilePath ); - fs.mkdirs(hdfsfilePath); - } - } catch ( Throwable t) { - throw t; - } - - - for ( File cacheFile : files) { - try { - LogLog.debug("Copying Files..." + "File Path: " + fsPath + "CacheFile: " +cacheFile + "HDFS Path:" + hdfsfilePath); - FileUtil.copy(cacheFile, this.fs, this.hdfsfilePath, true, this.conf); - } catch (Throwable t) { - - throw t; - } - } - } - - public void rollRemainingCacheFile() { - String datePattern = "'.'yyyy-MM-dd-HH-mm"; - SimpleDateFormat sdf = new SimpleDateFormat(datePattern); - if (new File(cacheFile).length() != 0 ) { - long epochNow = System.currentTimeMillis(); - - String datedCacheFileName = cacheFile + sdf.format(epochNow); - - LogLog.debug("Rolling over remaining cache File "+ datedCacheFileName); - closeCacheFile(); - - File target = new File(datedCacheFileName); - if (target.exists()) { - target.delete(); - } - - File file = new File(cacheFile); - - boolean result = file.renameTo(target); - - if(result) { - LogLog.debug(cacheFile +" -> "+ datedCacheFileName); - } else { - LogLog.error("Failed to rename cache file ["+cacheFile+"] to ["+datedCacheFileName+"]."); - } - - } - } - - public void closeCacheFile() { - try { - if(cacheWriter != null) { - cacheWriter.close(); - cacheWriter = null; - } - } catch(IOException ie) { - LogLog.error("unable to close cache writer", ie); - } - } -} - -
