This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git
The following commit(s) were added to refs/heads/master by this push:
new 543a7ea75c [INLONG-9286][Agent] Adjust the time offset calculation
function (#9288)
543a7ea75c is described below
commit 543a7ea75cbe902943b461e33615455f42e3eade
Author: justinwwhuang <[email protected]>
AuthorDate: Wed Nov 15 10:48:49 2023 +0800
[INLONG-9286][Agent] Adjust the time offset calculation function (#9288)
---
.../apache/inlong/agent/utils/DateTransUtils.java | 119 +++++++++++++++++++++
.../agent/plugin/task/filecollect/FileScanner.java | 22 ++--
.../task/filecollect/LogFileCollectTask.java | 13 ++-
.../agent/plugin/task/filecollect/WatchEntity.java | 3 +-
.../agent/plugin/utils/file/NewDateUtils.java | 119 ++++-----------------
.../inlong/agent/plugin/utils/TestUtils.java | 4 +-
6 files changed, 163 insertions(+), 117 deletions(-)
diff --git
a/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
new file mode 100644
index 0000000000..ced881c3f7
--- /dev/null
+++
b/inlong-agent/agent-common/src/main/java/org/apache/inlong/agent/utils/DateTransUtils.java
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.inlong.agent.utils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.TimeZone;
+
+public class DateTransUtils {
+
+ private static final Logger logger =
LoggerFactory.getLogger(DateTransUtils.class);
+
+ // convert millSec to YYYMMDD by cycleUnit
+ public static String millSecConvertToTimeStr(long time, String cycleUnit) {
+ return millSecConvertToTimeStr(time, cycleUnit, TimeZone.getDefault());
+ }
+
+ // convert YYYMMDD to millSec by cycleUnit
+ public static long timeStrConvertTomillSec(String time, String cycleUnit)
+ throws ParseException {
+ return timeStrConvertTomillSec(time, cycleUnit, TimeZone.getDefault());
+ }
+
+ public static long timeStrConvertTomillSec(String time, String cycleUnit,
TimeZone timeZone)
+ throws ParseException {
+ long retTime = 0;
+ // SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
+ SimpleDateFormat df = null;
+ if (cycleUnit.equals("Y") && time.length() == 4) {
+ df = new SimpleDateFormat("yyyy");
+ } else if (cycleUnit.equals("M") && time.length() == 6) {
+ df = new SimpleDateFormat("yyyyMM");
+ } else if (cycleUnit.equals("D") && time.length() == 8) {
+ df = new SimpleDateFormat("yyyyMMdd");
+ } else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) {
+ df = new SimpleDateFormat("yyyyMMddHH");
+ } else if (cycleUnit.contains("m") && time.length() == 12) {
+ df = new SimpleDateFormat("yyyyMMddHHmm");
+ } else {
+ logger.error("time {},cycleUnit {} can't parse!", time, cycleUnit);
+ throw new ParseException(time, 0);
+ }
+ try {
+ df.setTimeZone(timeZone);
+ retTime = df.parse(time).getTime();
+ if (cycleUnit.equals("10m")) {
+
+ }
+ } catch (ParseException e) {
+ logger.error("convert time string error. ", e);
+ }
+ return retTime;
+ }
+
+ // convert millSec to YYYMMDD by cycleUnit
+ public static String millSecConvertToTimeStr(long time, String cycleUnit,
TimeZone tz) {
+ String retTime = null;
+
+ Calendar calendarInstance = Calendar.getInstance();
+ calendarInstance.setTimeInMillis(time);
+
+ Date dateTime = calendarInstance.getTime();
+ SimpleDateFormat df = null;
+ if ("Y".equalsIgnoreCase(cycleUnit)) {
+ df = new SimpleDateFormat("yyyy");
+ } else if ("M".equals(cycleUnit)) {
+ df = new SimpleDateFormat("yyyyMM");
+ } else if ("D".equalsIgnoreCase(cycleUnit)) {
+ df = new SimpleDateFormat("yyyyMMdd");
+ } else if ("h".equalsIgnoreCase(cycleUnit)) {
+ df = new SimpleDateFormat("yyyyMMddHH");
+ } else if (cycleUnit.contains("m")) {
+ df = new SimpleDateFormat("yyyyMMddHHmm");
+ } else {
+ logger.error("cycleUnit {} can't parse!", cycleUnit);
+ df = new SimpleDateFormat("yyyyMMddHH");
+ }
+ df.setTimeZone(tz);
+ retTime = df.format(dateTime);
+
+ if (cycleUnit.contains("m")) {
+
+ int cycleNum = Integer.parseInt(cycleUnit.substring(0,
+ cycleUnit.length() - 1));
+ int mmTime = Integer.parseInt(retTime.substring(
+ retTime.length() - 2, retTime.length()));
+ String realMMTime = "";
+ if (cycleNum * (mmTime / cycleNum) <= 0) {
+ realMMTime = "0" + cycleNum * (mmTime / cycleNum);
+ } else {
+ realMMTime = "" + cycleNum * (mmTime / cycleNum);
+ }
+ retTime = retTime.substring(0, retTime.length() - 2) + realMMTime;
+ }
+
+ return retTime;
+ }
+
+}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
index efa87e22dc..8f7d2d9d80 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/FileScanner.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
import org.apache.inlong.agent.plugin.utils.file.FileTimeComparator;
import org.apache.inlong.agent.plugin.utils.file.Files;
import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
+import org.apache.inlong.agent.utils.DateTransUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -57,20 +58,19 @@ public class FileScanner {
private static final Logger logger =
LoggerFactory.getLogger(FileScanner.class);
- public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf,
String originPattern, long failTime,
- long recoverTime, boolean isRetry) {
+ public static List<BasicFileInfo> scanTaskBetweenTimes(TaskProfile conf,
String originPattern, long startTime,
+ long endTime, boolean isRetry) {
String cycleUnit = conf.getCycleUnit();
if (!isRetry) {
- failTime -= NewDateUtils.calcOffset(conf.getTimeOffset());
- recoverTime -= NewDateUtils.calcOffset(conf.getTimeOffset());
+ startTime += NewDateUtils.calcOffset(conf.getTimeOffset());
+ endTime += NewDateUtils.calcOffset(conf.getTimeOffset());
}
-
- String startTime = NewDateUtils.millSecConvertToTimeStr(failTime,
cycleUnit);
- String endTime = NewDateUtils.millSecConvertToTimeStr(recoverTime,
cycleUnit);
+ String strStartTime =
DateTransUtils.millSecConvertToTimeStr(startTime, cycleUnit);
+ String strEndTime = DateTransUtils.millSecConvertToTimeStr(endTime,
cycleUnit);
logger.info("task {} this scan time is between {} and {}.",
- new Object[]{conf.getTaskId(), startTime, endTime});
+ new Object[]{conf.getTaskId(), strStartTime, strEndTime});
- return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern,
startTime, endTime);
+ return scanTaskBetweenTimes(conf.getCycleUnit(), originPattern,
strStartTime, strEndTime);
}
/* Scan log files and create tasks between two times. */
@@ -89,10 +89,10 @@ public class FileScanner {
DEFAULT_FILE_MAX_NUM);
for (String file : fileList) {
// TODO the time is not YYYYMMDDHH
- String dataTime = NewDateUtils.millSecConvertToTimeStr(time,
cycleUnit);
+ String dataTime = DateTransUtils.millSecConvertToTimeStr(time,
cycleUnit);
BasicFileInfo info = new BasicFileInfo(file, dataTime);
logger.info("scan new task fileName {} ,dataTime {}", file,
- NewDateUtils.millSecConvertToTimeStr(time, cycleUnit));
+ DateTransUtils.millSecConvertToTimeStr(time,
cycleUnit));
infos.add(info);
}
}
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
index 26c61efa7a..dc183881c1 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/LogFileCollectTask.java
@@ -80,6 +80,8 @@ public class LogFileCollectTask extends Task {
public static final long DAY_TIMEOUT_INTERVAL = 2 * 24 * 3600 * 1000;
public static final int CORE_THREAD_SLEEP_TIME = 1000;
public static final int CORE_THREAD_MAX_GAP_TIME_MS = 60 * 1000;
+ public static final int CORE_THREAD_PRINT_TIME = 10000;
+ private long lastPrintTime = 0;
private boolean retry;
private long startTime;
private long endTime;
@@ -221,6 +223,9 @@ public class LogFileCollectTask extends Task {
@Override
public String getTaskId() {
+ if (taskProfile == null) {
+ return "";
+ }
return taskProfile.getTaskId();
}
@@ -234,6 +239,10 @@ public class LogFileCollectTask extends Task {
Thread.currentThread().setName("directory-task-core-" + getTaskId());
running = true;
while (!isFinished()) {
+ if (AgentUtils.getCurrentTime() - lastPrintTime >
CORE_THREAD_PRINT_TIME) {
+ LOGGER.info("log file task running! taskId {}", getTaskId());
+ lastPrintTime = AgentUtils.getCurrentTime();
+ }
coreThreadUpdateTime = AgentUtils.getCurrentTime();
AgentUtils.silenceSleepInMs(CORE_THREAD_SLEEP_TIME);
if (!initOK) {
@@ -274,7 +283,7 @@ public class LogFileCollectTask extends Task {
private void scanExistingFile() {
originPatterns.forEach((originPattern) -> {
List<BasicFileInfo> fileInfos =
scanExistingFileByPattern(originPattern);
- LOGGER.info("scan {} get file count {}", originPattern,
fileInfos.size());
+ LOGGER.info("taskId {} scan {} get file count {}", getTaskId(),
originPattern, fileInfos.size());
fileInfos.forEach((fileInfo) -> {
addToEvenMap(fileInfo.fileName, fileInfo.dataTime);
});
@@ -299,7 +308,7 @@ public class LogFileCollectTask extends Task {
long currentTime = System.currentTimeMillis();
// only scan two cycle, like two hours or two days
long offset = NewDateUtils.calcOffset("-2" +
taskProfile.getCycleUnit());
- startScanTime = currentTime - offset;
+ startScanTime = currentTime + offset;
endScanTime = currentTime;
}
return FileScanner.scanTaskBetweenTimes(taskProfile, originPattern,
startScanTime, endScanTime, retry);
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
index e0c6f464c6..8fb9755716 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/task/filecollect/WatchEntity.java
@@ -22,6 +22,7 @@ import org.apache.inlong.agent.plugin.utils.file.FilePathUtil;
import org.apache.inlong.agent.plugin.utils.file.NewDateUtils;
import org.apache.inlong.agent.plugin.utils.file.NonRegexPatternPosition;
import org.apache.inlong.agent.plugin.utils.file.PathDateExpression;
+import org.apache.inlong.agent.utils.DateTransUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -273,7 +274,7 @@ public class WatchEntity {
logger.info("removeUselessWatchDirectories {}", curDataTime);
/* Calculate the data time which is 3 cycle units earlier than current
task data time. */
- long curDataTimeMillis =
NewDateUtils.timeStrConvertTomillSec(curDataTime, cycleUnit);
+ long curDataTimeMillis =
DateTransUtils.timeStrConvertTomillSec(curDataTime, cycleUnit);
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(curDataTimeMillis);
if ("D".equalsIgnoreCase(cycleUnit)) {
diff --git
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
index c6d8082651..b06478558d 100644
---
a/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
+++
b/inlong-agent/agent-plugins/src/main/java/org/apache/inlong/agent/plugin/utils/file/NewDateUtils.java
@@ -17,6 +17,8 @@
package org.apache.inlong.agent.plugin.utils.file;
+import org.apache.inlong.agent.utils.DateTransUtils;
+
import hirondelle.date4j.DateTime;
import org.apache.commons.lang.StringUtils;
import org.slf4j.Logger;
@@ -119,10 +121,10 @@ public class NewDateUtils {
}
public static String getDateTime(String dataTime, String cycleUnit, String
offset) {
- String retTime = NewDateUtils.millSecConvertToTimeStr(
+ String retTime = DateTransUtils.millSecConvertToTimeStr(
System.currentTimeMillis(), cycleUnit);
try {
- long time = NewDateUtils.timeStrConvertTomillSec(dataTime,
cycleUnit);
+ long time = DateTransUtils.timeStrConvertTomillSec(dataTime,
cycleUnit);
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
@@ -131,7 +133,7 @@ public class NewDateUtils {
return dataTime;
}
- retTime =
NewDateUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(),
+ retTime =
DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(),
cycleUnit);
} catch (Exception e) {
logger.error("getDateTime error: ", e);
@@ -143,7 +145,7 @@ public class NewDateUtils {
Calendar calendar = Calendar.getInstance();
calendar.setTimeInMillis(time);
Calendar retCalendar = getDateTime(calendar, cycleUnit, offset);
- return
NewDateUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(),
cycleUnit);
+ return
DateTransUtils.millSecConvertToTimeStr(retCalendar.getTime().getTime(),
cycleUnit);
}
private static Calendar getDateTime(Calendar calendar, String cycleUnit,
String offset) {
@@ -220,11 +222,10 @@ public class NewDateUtils {
timeInterval = DAY_TIMEOUT_INTERVAL;
}
- // To handle the offset, add the time offset to the timeout period
if (timeOffset.startsWith("-")) {
- timeInterval += calcOffset(timeOffset);
- } else { // Process Backward Offset
timeInterval -= calcOffset(timeOffset);
+ } else {
+ timeInterval += calcOffset(timeOffset);
}
return isValidCreationTime(dataTime, timeInterval);
@@ -242,14 +243,16 @@ public class NewDateUtils {
*/
public static long calcOffset(String timeOffset) {
String offsetUnit = timeOffset.substring(timeOffset.length() - 1);
- int startIndex = timeOffset.charAt(0) == '-' ? 1 : 0;
- // Default Backward Offset
- int symbol = 1;
- if (startIndex == 1) {
- symbol = 1;
- } else if (startIndex == 0) { // Forward offset
+ int startIndex;
+ int symbol;
+ if (timeOffset.charAt(0) == '-') {
symbol = -1;
+ startIndex = 1;
+ } else {
+ symbol = 1;
+ startIndex = 0;
}
+
String strOffset = timeOffset.substring(startIndex,
timeOffset.length() - 1);
if (strOffset.length() == 0) {
return 0;
@@ -294,92 +297,6 @@ public class NewDateUtils {
&& calendar.getTimeInMillis() <= maxTime;
}
- // convert millSec to YYYMMDD by cycleUnit
- public static String millSecConvertToTimeStr(long time, String cycleUnit,
TimeZone tz) {
- String retTime = null;
-
- Calendar calendarInstance = Calendar.getInstance();
- calendarInstance.setTimeInMillis(time);
-
- Date dateTime = calendarInstance.getTime();
- SimpleDateFormat df = null;
- if ("Y".equalsIgnoreCase(cycleUnit)) {
- df = new SimpleDateFormat("yyyy");
- } else if ("M".equals(cycleUnit)) {
- df = new SimpleDateFormat("yyyyMM");
- } else if ("D".equalsIgnoreCase(cycleUnit)) {
- df = new SimpleDateFormat("yyyyMMdd");
- } else if ("h".equalsIgnoreCase(cycleUnit)) {
- df = new SimpleDateFormat("yyyyMMddHH");
- } else if (cycleUnit.contains("m")) {
- df = new SimpleDateFormat("yyyyMMddHHmm");
- } else {
- logger.error("cycleUnit {} can't parse!", cycleUnit);
- df = new SimpleDateFormat("yyyyMMddHH");
- }
- df.setTimeZone(tz);
- retTime = df.format(dateTime);
-
- if (cycleUnit.contains("m")) {
-
- int cycleNum = Integer.parseInt(cycleUnit.substring(0,
- cycleUnit.length() - 1));
- int mmTime = Integer.parseInt(retTime.substring(
- retTime.length() - 2, retTime.length()));
- String realMMTime = "";
- if (cycleNum * (mmTime / cycleNum) <= 0) {
- realMMTime = "0" + cycleNum * (mmTime / cycleNum);
- } else {
- realMMTime = "" + cycleNum * (mmTime / cycleNum);
- }
- retTime = retTime.substring(0, retTime.length() - 2) + realMMTime;
- }
-
- return retTime;
- }
-
- // convert millSec to YYYMMDD by cycleUnit
- public static String millSecConvertToTimeStr(long time, String cycleUnit) {
- return millSecConvertToTimeStr(time, cycleUnit, TimeZone.getDefault());
- }
-
- // convert YYYMMDD to millSec by cycleUnit
- public static long timeStrConvertTomillSec(String time, String cycleUnit)
- throws ParseException {
- return timeStrConvertTomillSec(time, cycleUnit, TimeZone.getDefault());
- }
-
- public static long timeStrConvertTomillSec(String time, String cycleUnit,
TimeZone timeZone)
- throws ParseException {
- long retTime = 0;
- // SimpleDateFormat df=new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
- SimpleDateFormat df = null;
- if (cycleUnit.equals("Y") && time.length() == 4) {
- df = new SimpleDateFormat("yyyy");
- } else if (cycleUnit.equals("M") && time.length() == 6) {
- df = new SimpleDateFormat("yyyyMM");
- } else if (cycleUnit.equals("D") && time.length() == 8) {
- df = new SimpleDateFormat("yyyyMMdd");
- } else if (cycleUnit.equalsIgnoreCase("h") && time.length() == 10) {
- df = new SimpleDateFormat("yyyyMMddHH");
- } else if (cycleUnit.contains("m") && time.length() == 12) {
- df = new SimpleDateFormat("yyyyMMddHHmm");
- } else {
- logger.error("time {},cycleUnit {} can't parse!", time, cycleUnit);
- throw new ParseException(time, 0);
- }
- try {
- df.setTimeZone(timeZone);
- retTime = df.parse(time).getTime();
- if (cycleUnit.equals("10m")) {
-
- }
- } catch (ParseException e) {
- logger.error("convert time string error. ", e);
- }
- return retTime;
- }
-
public static boolean isBraceContain(String dataName) {
Matcher matcher = bracePatt.matcher(dataName);
return matcher.find();
@@ -675,8 +592,8 @@ public class NewDateUtils {
long startTime;
long endTime;
try {
- startTime = NewDateUtils.timeStrConvertTomillSec(start, cycleUnit);
- endTime = NewDateUtils.timeStrConvertTomillSec(end, cycleUnit);
+ startTime = DateTransUtils.timeStrConvertTomillSec(start,
cycleUnit);
+ endTime = DateTransUtils.timeStrConvertTomillSec(end, cycleUnit);
} catch (ParseException e) {
logger.error("date format is error: ", e);
return ret;
diff --git
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
index c6ff2d6642..81d2433349 100644
---
a/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
+++
b/inlong-agent/agent-plugins/src/test/java/org/apache/inlong/agent/plugin/utils/TestUtils.java
@@ -46,8 +46,8 @@ public class TestUtils {
@Test
public void testCalcOffset() {
- Assert.assertTrue(NewDateUtils.calcOffset("-1h") == 3600 * 1000);
- Assert.assertTrue(NewDateUtils.calcOffset("1D") == -24 * 3600 * 1000);
+ Assert.assertTrue(NewDateUtils.calcOffset("-1h") == -3600 * 1000);
+ Assert.assertTrue(NewDateUtils.calcOffset("1D") == 24 * 3600 * 1000);
Assert.assertTrue(NewDateUtils.calcOffset("0") == 0);
Assert.assertTrue(NewDateUtils.calcOffset("1") == 0);
Assert.assertTrue(NewDateUtils.calcOffset("10") == 0);